Skip to content

Commit c3ae79f

Browse files
committed
feat(dags): rewrite dags with outlet in an asset manner
AIP-75
1 parent ea67601 commit c3ae79f

File tree

3 files changed

+65
-95
lines changed

3 files changed

+65
-95
lines changed

dags/app/discord.py

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414
AssetWatcher(
1515
name="finance_report_watcher",
1616
trigger=FinanceReportTrigger(
17-
poke_interval=86400, # 60*60*24
17+
# poke_interval=86400, # 60*60*24
18+
poke_interval=5, # 60*60*24
1819
),
1920
)
2021
],
@@ -24,8 +25,8 @@
2425
@dag(
2526
schedule=(
2627
finance_report_asset
27-
| Asset.ref(name="CFP_summary")
28-
| Asset.ref(name="kktix_order_report")
28+
| Asset(name="proposal_count")
29+
| Asset(name="registration_statistics")
2930
),
3031
start_date=datetime(2025, 6, 28),
3132
catchup=False,
@@ -45,20 +46,22 @@ def discord_message_notification():
4546
def send_discord_message(**context: Context) -> None:
4647
triggering_asset_events = context["triggering_asset_events"]
4748
session = requests.session()
48-
for asset_uri, asset_event in triggering_asset_events.items():
49+
logger.info(f"Receive asset events {triggering_asset_events}")
50+
for asset_uri, asset_events in triggering_asset_events.items():
4951
logger.info(f"Receive asset event from Asset uri={asset_uri}")
50-
if asset_event.extra.get("from_trigger", False): # type: ignore[attr-defined]
51-
details = asset_event.extra["payload"] # type: ignore[attr-defined]
52-
else:
53-
details = asset_event.extra # type: ignore[attr-defined]
52+
for asset_event in asset_events: # type: ignore[attr-defined]
53+
if asset_event.extra.get("from_trigger", False):
54+
details = asset_event.extra["payload"]
55+
else:
56+
details = asset_event.extra
5457

55-
session.post(
56-
details.get("webhook_url"),
57-
json={
58-
"username": details.get("username"),
59-
"content": details.get("content"),
60-
},
61-
)
58+
session.post(
59+
details.get("webhook_url"),
60+
json={
61+
"username": details.get("username"),
62+
"content": details.get("content"),
63+
},
64+
)
6265

6366
send_discord_message()
6467

dags/app/proposal_reminder/dag.py

Lines changed: 25 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -2,50 +2,34 @@
22
Send Proposal Summary to Discord
33
"""
44

5-
from datetime import datetime, timedelta
6-
7-
from airflow.sdk import Asset, Metadata, Variable, dag, task
5+
from airflow.sdk import Metadata, Variable, asset
86
from app.proposal_reminder.udf import get_proposal_summary
97

10-
DEFAULT_ARGS = {
11-
"owner": "Henry Lee",
12-
"depends_on_past": False,
13-
"start_date": datetime(2025, 2, 25),
14-
"end_date": datetime(2025, 4, 9),
15-
"retries": 2,
16-
"retry_delay": timedelta(minutes=5),
17-
}
18-
8+
# DEFAULT_ARGS = {
9+
# "owner": "Henry Lee",
10+
# "depends_on_past": False,
11+
# "start_date": datetime(2025, 2, 25),
12+
# "end_date": datetime(2025, 4, 9),
13+
# "retries": 2,
14+
# "retry_delay": timedelta(minutes=5),
15+
# }
1916

20-
cfp_summary_asset = Asset(name="CFP_summary")
2117

22-
23-
@dag(
24-
default_args=DEFAULT_ARGS,
18+
@asset(
19+
name="proposal_count",
20+
dag_id="proposal_count",
2521
schedule="0 16 * * *", # At 16:00 (00:00 +8)
26-
max_active_runs=1,
27-
catchup=False,
2822
)
29-
def DISCORD_PROPOSAL_REMINDER_v3():
30-
@task(outlets=[cfp_summary_asset])
31-
def fetch_proposal_summary():
32-
summary = get_proposal_summary()
33-
n_talk = summary["num_proposed_talk"]
34-
n_tutorial = summary["num_proposed_tutorial"]
35-
36-
yield Metadata(
37-
cfp_summary_asset,
38-
extra={
39-
"webhook_url": Variable.get("DISCORD_PROGRAM_REMINDER_WEBHOOK"),
40-
"username": "Program talk reminder",
41-
"content": f"目前投稿議程數: {n_talk}; 課程數: {n_tutorial}",
42-
},
43-
)
44-
45-
fetch_proposal_summary()
46-
47-
48-
dag_obj = DISCORD_PROPOSAL_REMINDER_v3()
49-
50-
if __name__ == "__main__":
51-
dag_obj.test()
23+
def DISCORD_PROPOSAL_REMINDER_v3(self):
24+
summary = get_proposal_summary()
25+
n_talk = summary["num_proposed_talk"]
26+
n_tutorial = summary["num_proposed_tutorial"]
27+
28+
yield Metadata(
29+
self,
30+
extra={
31+
"webhook_url": Variable.get("DISCORD_PROGRAM_REMINDER_WEBHOOK"),
32+
"username": "Program talk reminder",
33+
"content": f"目前投稿議程數: {n_talk}; 課程數: {n_tutorial}",
34+
},
35+
)

dags/app/team_registration_bot/dag.py

Lines changed: 22 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -2,51 +2,34 @@
22
Send daily ordering metrics to discord channel
33
"""
44

5-
from datetime import datetime, timedelta
6-
7-
from airflow.sdk import Asset, Metadata, Variable, dag, task
5+
from airflow.sdk import Metadata, Variable, asset
86
from app.team_registration_bot.udf import (
97
_compose_discord_msg,
108
_get_statistics_from_bigquery,
119
)
1210

13-
DEFAULT_ARGS = {
14-
"owner": "David Jr.",
15-
"depends_on_past": False,
16-
"start_date": datetime(2022, 7, 4),
17-
"retries": 2,
18-
"retry_delay": timedelta(minutes=5),
19-
"on_failure_callback": lambda x: "Need to send notification to Discord!",
20-
}
21-
11+
# DEFAULT_ARGS = {
12+
# "owner": "David Jr.",
13+
# "depends_on_past": False,
14+
# "start_date": datetime(2022, 7, 4),
15+
# "retries": 2,
16+
# "retry_delay": timedelta(minutes=5),
17+
# "on_failure_callback": lambda x: "Need to send notification to Discord!",
18+
# }
2219

23-
kktix_order_report_asset = Asset(name="kktix_order_report")
2420

25-
26-
@dag(
27-
default_args=DEFAULT_ARGS,
21+
@asset(
22+
name="registration_statistics",
23+
dag_id="registration_statistics",
2824
schedule="@daily",
29-
max_active_runs=1,
30-
catchup=False,
3125
)
32-
def KKTIX_DISCORD_BOT_FOR_TEAM_REGISTRATION():
33-
@task(outlets=[kktix_order_report_asset])
34-
def LOAD_TO_DISCORD():
35-
statistics = _get_statistics_from_bigquery()
36-
yield Metadata(
37-
kktix_order_report_asset,
38-
extra={
39-
"webhook_url": Variable.get("discord_webhook_registration_endpoint"),
40-
"username": "KKTIX order report",
41-
"content": _compose_discord_msg(statistics),
42-
},
43-
)
44-
45-
LOAD_TO_DISCORD()
46-
47-
48-
dag_obj = KKTIX_DISCORD_BOT_FOR_TEAM_REGISTRATION()
49-
50-
51-
if __name__ == "__main__":
52-
dag_obj.test()
26+
def KKTIX_DISCORD_BOT_FOR_TEAM_REGISTRATION(self):
27+
statistics = _get_statistics_from_bigquery()
28+
yield Metadata(
29+
self,
30+
extra={
31+
"webhook_url": Variable.get("discord_webhook_registration_endpoint"),
32+
"username": "KKTIX order report",
33+
"content": _compose_discord_msg(statistics),
34+
},
35+
)

0 commit comments

Comments
 (0)