Skip to content

Commit 9ff6fbc

Browse files
Merge pull request #163 from pycontw/airflow-3-bug-fixes
Add airflow triggerer, update config to fit our limited resource and fix bug found in posts_insights dag
2 parents 6f11389 + db3128c commit 9ff6fbc

File tree

4 files changed

+43
-2
lines changed

4 files changed

+43
-2
lines changed

airflow.cfg

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ auth_manager = airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManage
2323
[dag_processor]
2424
# How long before timing out a DagFileProcessor, which processes a dag file
2525
dag_file_processor_timeout = 600
26+
# We don't really need it based on how we deploy dags.
27+
# But set it to a large enough 60 * 24 (1 day) to avoid high CPU usage
28+
min_file_process_interval = 1440
2629

2730
[database]
2831
# The SqlAlchemy connection string to the metadata database.
@@ -34,7 +37,8 @@ external_db_managers = airflow.providers.fab.auth_manager.models.db.FABDBManager
3437

3538
[api]
3639
# Number of workers to run the Gunicorn web server
37-
workers = 2
40+
# WARNING:: DO NOT increase this number. Due to our limited resources, increasing it to 2 breaks API server
41+
workers = 1
3842

3943
# Number of seconds the gunicorn webserver waits before timing out on a worker
4044
worker_timeout = 600

dags/utils/posts_insights/base.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,14 +126,21 @@ def _dump_to_bigquery(
126126
logger.info(f"No {dump_type} to dump!")
127127
return
128128

129+
if dump_type == "posts":
130+
target_table = self.POST_TABLE_NAME
131+
elif dump_type == "posts insights":
132+
target_table = self.INSIGHT_TABLE_NAME
133+
else:
134+
raise ValueError(f"Unexpected dump_type {dump_type}")
135+
129136
job_config = bigquery.LoadJobConfig(
130137
schema=bq_schema_fields,
131138
write_disposition="WRITE_APPEND",
132139
)
133140
try:
134141
job = self.bq_client.load_table_from_json(
135142
posts,
136-
f"pycontw-225217.ods.{self.INSIGHT_TABLE_NAME}",
143+
f"pycontw-225217.ods.{target_table}",
137144
job_config=job_config,
138145
)
139146
job.result()

docker-compose-dev.yml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,21 @@ services:
6363
airflow-init:
6464
condition: service_completed_successfully
6565

66+
airflow-triggerer:
67+
<<: *docker-common
68+
container_name: airflow-triggerer
69+
command: triggerer
70+
healthcheck:
71+
test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
72+
interval: 30s
73+
timeout: 10s
74+
retries: 5
75+
start_period: 30s
76+
restart: always
77+
depends_on:
78+
airflow-init:
79+
condition: service_completed_successfully
80+
6681
airflow-init:
6782
<<: *docker-common
6883
entrypoint: /bin/bash

docker-compose.yml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,21 @@ services:
5858
airflow-init:
5959
condition: service_completed_successfully
6060

61+
airflow-triggerer:
62+
<<: *docker-common
63+
container_name: airflow-triggerer
64+
command: triggerer
65+
healthcheck:
66+
test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
67+
interval: 30s
68+
timeout: 10s
69+
retries: 5
70+
start_period: 30s
71+
restart: always
72+
depends_on:
73+
airflow-init:
74+
condition: service_completed_successfully
75+
6176
airflow-init:
6277
<<: *docker-common
6378
entrypoint: /bin/bash

0 commit comments

Comments
 (0)