Skip to content

Commit ab5eb4a

Browse files
committed
feat(discord): move all discord message sending to one dag and use asset to trigger
1 parent ae555c8 commit ab5eb4a

File tree

7 files changed

+227
-177
lines changed

7 files changed

+227
-177
lines changed

dags/app/discord.py

Lines changed: 62 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,69 @@
1+
import logging
2+
from datetime import datetime, timedelta
3+
14
import requests
2-
import tenacity
5+
from airflow.sdk import Asset, AssetWatcher, Context, dag, task
6+
7+
from triggers.finance_report import FinanceReportTrigger
38

4-
session = requests.session()
9+
logger = logging.getLogger(__name__)
510

6-
RETRY_ARGS = dict(
7-
wait=tenacity.wait_random(min=1, max=10),
8-
stop=tenacity.stop_after_attempt(10),
9-
retry=tenacity.retry_if_exception_type(requests.exceptions.ConnectionError),
11+
finance_report_asset = Asset(
12+
name="finance_report",
13+
watchers=[
14+
AssetWatcher(
15+
name="finance_report_watcher",
16+
trigger=FinanceReportTrigger(
17+
poke_interval=5, # 60*60*24
18+
),
19+
)
20+
],
1021
)
1122

1223

13-
@tenacity.retry(**RETRY_ARGS) # type: ignore[call-overload]
14-
def send_webhook_message(webhook_url: str, username: str, msg: str) -> None:
15-
session.post(
16-
webhook_url,
17-
json={"username": username, "content": msg},
24+
@dag(
25+
schedule=(
26+
finance_report_asset
27+
| Asset.ref(name="CFP_summary")
28+
| Asset.ref(name="kktix_order_report")
29+
),
30+
start_date=datetime(2025, 6, 28),
31+
catchup=False,
32+
max_active_runs=1,
33+
default_args={
34+
"owner": "Wei Lee",
35+
"depends_on_past": False,
36+
},
37+
)
38+
def discord_message_notification():
39+
"""Send Discord Message"""
40+
41+
@task(
42+
retries=10,
43+
retry_delay=timedelta(seconds=10),
1844
)
45+
def send_discord_message(**context: Context) -> None:
46+
triggering_asset_events = context["triggering_asset_events"]
47+
session = requests.session()
48+
for asset_uri, asset_event in triggering_asset_events.items():
49+
logger.info(f"Receive asset event from Asset uri={asset_uri}")
50+
if asset_event.extra.get("from_trigger", False): # type: ignore[attr-defined]
51+
details = asset_event.extra["payload"] # type: ignore[attr-defined]
52+
else:
53+
details = asset_event.extra # type: ignore[attr-defined]
54+
55+
session.post(
56+
details.get("webhook_url"),
57+
json={
58+
"username": details.get("username"),
59+
"content": details.get("content"),
60+
},
61+
)
62+
63+
send_discord_message()
64+
65+
66+
dag_obj = discord_message_notification()
67+
68+
if __name__ == "__main__":
69+
dag_obj.test()

dags/app/finance_bot/dag.py

Lines changed: 0 additions & 63 deletions
This file was deleted.

dags/app/finance_bot/udf.py

Lines changed: 0 additions & 79 deletions
This file was deleted.

dags/app/proposal_reminder/dag.py

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,8 @@
44

55
from datetime import datetime, timedelta
66

7-
from airflow.sdk import Variable, dag, task
8-
9-
from dags.app import discord
10-
from dags.app.proposal_reminder.udf import get_proposal_summary
7+
from airflow.sdk import Asset, Metadata, Variable, dag, task
8+
from app.proposal_reminder.udf import get_proposal_summary
119

1210
DEFAULT_ARGS = {
1311
"owner": "Henry Lee",
@@ -19,29 +17,32 @@
1917
}
2018

2119

20+
cfp_summary_asset = Asset(name="CFP_summary")
21+
22+
2223
@dag(
2324
default_args=DEFAULT_ARGS,
2425
schedule="0 16 * * *", # At 16:00 (00:00 +8)
2526
max_active_runs=1,
2627
catchup=False,
2728
)
2829
def DISCORD_PROPOSAL_REMINDER_v3():
29-
@task
30-
def SEND_PROPOSAL_SUMMARY():
31-
webhook_url = Variable.get("DISCORD_PROGRAM_REMINDER_WEBHOOK")
32-
30+
@task(outlets=[cfp_summary_asset])
31+
def fetch_proposal_summary():
3332
summary = get_proposal_summary()
3433
n_talk = summary["num_proposed_talk"]
3534
n_tutorial = summary["num_proposed_tutorial"]
36-
msg = f"目前投稿議程數: {n_talk}; 課程數: {n_tutorial}"
3735

38-
discord.send_webhook_message(
39-
webhook_url=webhook_url,
40-
username="Program talk reminder",
41-
msg=msg,
36+
yield Metadata(
37+
cfp_summary_asset,
38+
extra={
39+
"webhook_url": Variable.get("DISCORD_PROGRAM_REMINDER_WEBHOOK"),
40+
"username": "Program talk reminder",
41+
"content": f"目前投稿議程數: {n_talk}; 課程數: {n_tutorial}",
42+
},
4243
)
4344

44-
SEND_PROPOSAL_SUMMARY()
45+
fetch_proposal_summary()
4546

4647

4748
dag_obj = DISCORD_PROPOSAL_REMINDER_v3()

dags/app/team_registration_bot/dag.py

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,8 @@
44

55
from datetime import datetime, timedelta
66

7-
from airflow.sdk import Variable, dag, task
8-
9-
from dags.app import discord
10-
from dags.app.team_registration_bot.udf import (
7+
from airflow.sdk import Asset, Metadata, Variable, dag, task
8+
from app.team_registration_bot.udf import (
119
_compose_discord_msg,
1210
_get_statistics_from_bigquery,
1311
)
@@ -22,21 +20,26 @@
2220
}
2321

2422

23+
kktix_order_report_asset = Asset(name="kktix_order_report")
24+
25+
2526
@dag(
2627
default_args=DEFAULT_ARGS,
2728
schedule="@daily",
2829
max_active_runs=1,
2930
catchup=False,
3031
)
3132
def KKTIX_DISCORD_BOT_FOR_TEAM_REGISTRATION():
32-
@task
33+
@task(outlets=[kktix_order_report_asset])
3334
def LOAD_TO_DISCORD():
34-
webhook_url = Variable.get("discord_webhook_registration_endpoint")
3535
statistics = _get_statistics_from_bigquery()
36-
discord.send_webhook_message(
37-
webhook_url=webhook_url,
38-
usernmae="KKTIX order report",
39-
msg=_compose_discord_msg(statistics),
36+
yield Metadata(
37+
kktix_order_report_asset,
38+
extra={
39+
"webhook_url": Variable.get("discord_webhook_registration_endpoint"),
40+
"username": "KKTIX order report",
41+
"content": _compose_discord_msg(statistics),
42+
},
4043
)
4144

4245
LOAD_TO_DISCORD()
File renamed without changes.

0 commit comments

Comments
 (0)