Skip to content

Commit 11d0e0a

Browse files
fix(composer): fix version check logic for 'airflow_db_cleanup.py' (#13295)
* fix(composer): WIP fix version check logic - Create a function to convert version string to list of semantic versioning - Create unit test to validate a few scenarios * fix(composer): add TODO for developer * fix(composer): move the version parsing to a separate function * fix(composer): apply linting fixes * fix(composer): remove unneccessary argument
1 parent 36b8bb6 commit 11d0e0a

File tree

4 files changed

+71
-31
lines changed

4 files changed

+71
-31
lines changed

composer/workflows/airflow_db_cleanup.py

Lines changed: 53 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,10 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
# Note: This sample is designed for Airflow 1 and 2.
16+
1517
# [START composer_metadb_cleanup]
16-
"""
17-
A maintenance workflow that you can deploy into Airflow to periodically clean
18+
"""A maintenance workflow that you can deploy into Airflow to periodically clean
1819
out the DagRun, TaskInstance, Log, XCom, Job DB and SlaMiss entries to avoid
1920
having too much data in your Airflow MetaStore.
2021
@@ -68,33 +69,60 @@
6869
from sqlalchemy import desc, sql, text
6970
from sqlalchemy.exc import ProgrammingError
7071

72+
73+
def parse_airflow_version(version: str) -> tuple[int]:
74+
# TODO(developer): Update this function if you are using a version
75+
# with non-numerical characters such as "2.9.3rc1".
76+
COMPOSER_SUFFIX = "+composer"
77+
if version.endswith(COMPOSER_SUFFIX):
78+
airflow_version_without_suffix = version[:-len(COMPOSER_SUFFIX)]
79+
else:
80+
airflow_version_without_suffix = version
81+
airflow_version_str = airflow_version_without_suffix.split(".")
82+
83+
return tuple([int(s) for s in airflow_version_str])
84+
85+
7186
now = timezone.utcnow
7287

7388
# airflow-db-cleanup
7489
DAG_ID = os.path.basename(__file__).replace(".pyc", "").replace(".py", "")
90+
7591
START_DATE = airflow.utils.dates.days_ago(1)
76-
# How often to Run. @daily - Once a day at Midnight (UTC)
92+
93+
# How often to Run. @daily - Once a day at Midnight (UTC).
7794
SCHEDULE_INTERVAL = "@daily"
78-
# Who is listed as the owner of this DAG in the Airflow Web Server
95+
96+
# Who is listed as the owner of this DAG in the Airflow Web Server.
7997
DAG_OWNER_NAME = "operations"
80-
# List of email address to send email alerts to if this job fails
98+
99+
# List of email address to send email alerts to if this job fails.
81100
ALERT_EMAIL_ADDRESSES = []
82-
# Airflow version used by the environment in list form, value stored in
83-
# airflow_version is in format e.g "2.3.4+composer"
84-
AIRFLOW_VERSION = airflow_version[: -len("+composer")].split(".")
85-
# Length to retain the log files if not already provided in the conf. If this
86-
# is set to 30, the job will remove those files that arE 30 days old or older.
101+
102+
# Airflow version used by the environment as a tuple of integers.
103+
# For example: (2, 9, 2)
104+
#
105+
# Value in `airflow_version` is in format e.g. "2.9.2+composer"
106+
# It's converted to facilitate version comparison.
107+
AIRFLOW_VERSION = parse_airflow_version(airflow_version)
108+
109+
# Length to retain the log files if not already provided in the configuration.
110+
# If this is set to 30, the job will remove those files
111+
# that are 30 days old or older.
87112
DEFAULT_MAX_DB_ENTRY_AGE_IN_DAYS = int(
88113
Variable.get("airflow_db_cleanup__max_db_entry_age_in_days", 30)
89114
)
90-
# Prints the database entries which will be getting deleted; set to False
91-
# to avoid printing large lists and slowdown process
115+
116+
# Prints the database entries which will be getting deleted;
117+
# set to False to avoid printing large lists and slowdown the process.
92118
PRINT_DELETES = False
93-
# Whether the job should delete the db entries or not. Included if you want to
94-
# temporarily avoid deleting the db entries.
119+
120+
# Whether the job should delete the DB entries or not.
121+
# Included if you want to temporarily avoid deleting the DB entries.
95122
ENABLE_DELETE = True
96-
# List of all the objects that will be deleted. Comment out the DB objects you
97-
# want to skip.
123+
124+
# List of all the objects that will be deleted.
125+
# Comment out the DB objects you want to skip.
98126
DATABASE_OBJECTS = [
99127
{
100128
"airflow_db_model": DagRun,
@@ -105,9 +133,7 @@
105133
},
106134
{
107135
"airflow_db_model": TaskInstance,
108-
"age_check_column": TaskInstance.start_date
109-
if AIRFLOW_VERSION < ["2", "2", "0"]
110-
else TaskInstance.start_date,
136+
"age_check_column": TaskInstance.start_date,
111137
"keep_last": False,
112138
"keep_last_filters": None,
113139
"keep_last_group_by": None,
@@ -122,7 +148,7 @@
122148
{
123149
"airflow_db_model": XCom,
124150
"age_check_column": XCom.execution_date
125-
if AIRFLOW_VERSION < ["2", "2", "5"]
151+
if AIRFLOW_VERSION < (2, 2, 5)
126152
else XCom.timestamp,
127153
"keep_last": False,
128154
"keep_last_filters": None,
@@ -144,15 +170,15 @@
144170
},
145171
]
146172

147-
# Check for TaskReschedule model
173+
# Check for TaskReschedule model.
148174
try:
149175
from airflow.models import TaskReschedule
150176

151177
DATABASE_OBJECTS.append(
152178
{
153179
"airflow_db_model": TaskReschedule,
154180
"age_check_column": TaskReschedule.execution_date
155-
if AIRFLOW_VERSION < ["2", "2", "0"]
181+
if AIRFLOW_VERSION < (2, 2, 0)
156182
else TaskReschedule.start_date,
157183
"keep_last": False,
158184
"keep_last_filters": None,
@@ -163,7 +189,7 @@
163189
except Exception as e:
164190
logging.error(e)
165191

166-
# Check for TaskFail model
192+
# Check for TaskFail model.
167193
try:
168194
from airflow.models import TaskFail
169195

@@ -180,8 +206,8 @@
180206
except Exception as e:
181207
logging.error(e)
182208

183-
# Check for RenderedTaskInstanceFields model
184-
if AIRFLOW_VERSION < ["2", "4", "0"]:
209+
# Check for RenderedTaskInstanceFields model.
210+
if AIRFLOW_VERSION < (2, 4, 0):
185211
try:
186212
from airflow.models import RenderedTaskInstanceFields
187213

@@ -198,7 +224,7 @@
198224
except Exception as e:
199225
logging.error(e)
200226

201-
# Check for ImportError model
227+
# Check for ImportError model.
202228
try:
203229
from airflow.models import ImportError
204230

@@ -216,7 +242,7 @@
216242
except Exception as e:
217243
logging.error(e)
218244

219-
if AIRFLOW_VERSION < ["2", "6", "0"]:
245+
if AIRFLOW_VERSION < (2, 6, 0):
220246
try:
221247
from airflow.jobs.base_job import BaseJob
222248

@@ -530,5 +556,4 @@ def analyze_db():
530556

531557
print_configuration.set_downstream(cleanup_op)
532558
cleanup_op.set_downstream(analyze_op)
533-
534559
# [END composer_metadb_cleanup]

composer/workflows/airflow_db_cleanup_test.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,23 @@
1515

1616
import internal_unit_testing
1717

18+
from . import airflow_db_cleanup
1819

19-
def test_dag_import(airflow_database):
20+
21+
def test_version_comparison():
22+
# b/408307862 - Validate version check logic used in the sample.
23+
AIRFLOW_VERSION = airflow_db_cleanup.parse_airflow_version("2.10.5+composer")
24+
25+
assert AIRFLOW_VERSION == (2, 10, 5)
26+
assert AIRFLOW_VERSION > (2, 9, 1)
27+
28+
AIRFLOW_VERSION = airflow_db_cleanup.parse_airflow_version("2.9.2")
29+
30+
assert AIRFLOW_VERSION == (2, 9, 2)
31+
assert AIRFLOW_VERSION < (2, 9, 3)
32+
33+
34+
def test_dag_import():
2035
"""Test that the DAG file can be successfully imported.
2136
2237
This tests that the DAG can be parsed, but does not run it in an Airflow

composer/workflows/noxfile_config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
"3.10",
4040
"3.12",
4141
"3.13",
42-
], # Composer w/ Airflow 2 only supports Python 3.8
42+
],
4343
# Old samples are opted out of enforcing Python type hints
4444
# All new samples should feature them
4545
"enforce_type_hints": False,

composer/workflows/requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,5 @@
55
# https://github.com/apache/airflow/blob/main/pyproject.toml
66

77
apache-airflow[amazon,apache.beam,cncf.kubernetes,google,microsoft.azure,openlineage,postgres]==2.9.2
8-
google-cloud-dataform==0.5.9 # used in Dataform operators
8+
google-cloud-dataform==0.5.9 # Used in Dataform operators
99
scipy==1.14.1

0 commit comments

Comments
 (0)