Skip to content

Commit 0e03d57

Browse files
committed
feat(discord): move all discord message sending to one dag and use asset to trigger
1 parent 9ff6fbc commit 0e03d57

File tree

8 files changed

+271
-217
lines changed

8 files changed

+271
-217
lines changed

dags/app/channel_reminder/dag.py

Lines changed: 45 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,45 @@
1-
"""
2-
Send Google Search Report to Discord
3-
"""
4-
5-
from datetime import datetime, timedelta
6-
7-
from airflow.sdk import Variable, dag, task
8-
from app import discord
9-
10-
DEFAULT_ARGS = {
11-
"owner": "David Jr.",
12-
"depends_on_past": False,
13-
"start_date": datetime(2022, 9, 15),
14-
"retries": 2,
15-
"retry_delay": timedelta(minutes=5),
16-
"on_failure_callback": lambda x: "Need to send notification to Discord",
17-
}
18-
19-
20-
@dag(
21-
default_args=DEFAULT_ARGS,
22-
schedule="@yearly",
23-
max_active_runs=1,
24-
catchup=False,
25-
)
26-
def DISCORD_CHORES_REMINDER() -> None:
27-
@task
28-
def REMINDER_OF_THIS_TEAM() -> None:
29-
webhook_url = Variable.get("DISCORD_CHORES_REMINDER_WEBHOOK")
30-
discord.send_webhook_message(
31-
webhook_url=webhook_url,
32-
username="Data Team Airflow reminder",
33-
msg=(
34-
"<@&790739794148982796> <@&755827317904769184> <@&791157626099859487>\n"
35-
"記得大會結束後,要有一個人負責去取消 Klaviyo 的訂閱,不然我們每個月會一直繳 $NTD2000 喔!"
36-
),
37-
)
38-
39-
REMINDER_OF_THIS_TEAM()
40-
41-
42-
dag_obj = DISCORD_CHORES_REMINDER()
43-
44-
if __name__ == "__main__":
45-
dag_obj.test()
1+
# """
2+
# Send Google Search Report to Discord
3+
# """
4+
#
5+
# from datetime import datetime, timedelta
6+
#
7+
# from airflow.sdk import Variable, dag, task
8+
# from app import discord
9+
#
10+
# DEFAULT_ARGS = {
11+
# "owner": "David Jr.",
12+
# "depends_on_past": False,
13+
# "start_date": datetime(2022, 9, 15),
14+
# "retries": 2,
15+
# "retry_delay": timedelta(minutes=5),
16+
# "on_failure_callback": lambda x: "Need to send notification to Discord",
17+
# }
18+
#
19+
#
20+
# @dag(
21+
# default_args=DEFAULT_ARGS,
22+
# schedule="@yearly",
23+
# max_active_runs=1,
24+
# catchup=False,
25+
# )
26+
# def DISCORD_CHORES_REMINDER() -> None:
27+
# @task
28+
# def REMINDER_OF_THIS_TEAM() -> None:
29+
# webhook_url = Variable.get("DISCORD_CHORES_REMINDER_WEBHOOK")
30+
# discord.send_webhook_message(
31+
# webhook_url=webhook_url,
32+
# username="Data Team Airflow reminder",
33+
# msg=(
34+
# "<@&790739794148982796> <@&755827317904769184> <@&791157626099859487>\n"
35+
# "記得大會結束後,要有一個人負責去取消 Klaviyo 的訂閱,不然我們每個月會一直繳 $NTD2000 喔!"
36+
# ),
37+
# )
38+
#
39+
# REMINDER_OF_THIS_TEAM()
40+
#
41+
#
42+
# dag_obj = DISCORD_CHORES_REMINDER()
43+
#
44+
# if __name__ == "__main__":
45+
# dag_obj.test()

dags/app/discord.py

Lines changed: 62 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,69 @@
1+
import logging
2+
from datetime import datetime, timedelta
3+
14
import requests
2-
import tenacity
5+
from airflow.sdk import Asset, AssetWatcher, Context, dag, task
6+
7+
from triggers.finance_report import FinanceReportTrigger
38

4-
session = requests.session()
9+
logger = logging.getLogger(__name__)
510

6-
RETRY_ARGS = dict(
7-
wait=tenacity.wait_random(min=1, max=10),
8-
stop=tenacity.stop_after_attempt(10),
9-
retry=tenacity.retry_if_exception_type(requests.exceptions.ConnectionError),
11+
finance_report_asset = Asset(
12+
name="finance_report",
13+
watchers=[
14+
AssetWatcher(
15+
name="finance_report_watcher",
16+
trigger=FinanceReportTrigger(
17+
poke_interval=5, # 60*60*24
18+
),
19+
)
20+
],
1021
)
1122

1223

13-
@tenacity.retry(**RETRY_ARGS) # type: ignore[call-overload]
14-
def send_webhook_message(webhook_url: str, username: str, msg: str) -> None:
15-
session.post(
16-
webhook_url,
17-
json={"username": username, "content": msg},
24+
@dag(
25+
schedule=(
26+
finance_report_asset
27+
| Asset.ref(name="CFP_summary")
28+
| Asset.ref(name="kktix_order_report")
29+
),
30+
start_date=datetime(2025, 6, 28),
31+
catchup=False,
32+
max_active_runs=1,
33+
default_args={
34+
"owner": "Wei Lee",
35+
"depends_on_past": False,
36+
},
37+
)
38+
def discord_message_notification():
39+
"""Send Discord Message"""
40+
41+
@task(
42+
retries=10,
43+
retry_delay=timedelta(seconds=10),
1844
)
45+
def send_discord_message(**context: Context) -> None:
46+
triggering_asset_events = context["triggering_asset_events"]
47+
session = requests.session()
48+
for asset_uri, asset_event in triggering_asset_events.items():
49+
logger.info(f"Receive asset event from Asset uri={asset_uri}")
50+
if asset_event.extra.get("from_trigger", False): # type: ignore[attr-defined]
51+
details = asset_event.extra["payload"] # type: ignore[attr-defined]
52+
else:
53+
details = asset_event.extra # type: ignore[attr-defined]
54+
55+
session.post(
56+
details.get("webhook_url"),
57+
json={
58+
"username": details.get("username"),
59+
"content": details.get("content"),
60+
},
61+
)
62+
63+
send_discord_message()
64+
65+
66+
dag_obj = discord_message_notification()
67+
68+
if __name__ == "__main__":
69+
dag_obj.test()

dags/app/finance_bot/dag.py

Lines changed: 0 additions & 62 deletions
This file was deleted.

dags/app/finance_bot/udf.py

Lines changed: 0 additions & 79 deletions
This file was deleted.

dags/app/proposal_reminder/dag.py

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,7 @@
44

55
from datetime import datetime, timedelta
66

7-
from airflow.sdk import Variable, dag, task
8-
from app import discord
7+
from airflow.sdk import Asset, Metadata, Variable, dag, task
98
from app.proposal_reminder.udf import get_proposal_summary
109

1110
DEFAULT_ARGS = {
@@ -18,29 +17,32 @@
1817
}
1918

2019

20+
cfp_summary_asset = Asset(name="CFP_summary")
21+
22+
2123
@dag(
2224
default_args=DEFAULT_ARGS,
2325
schedule="0 16 * * *", # At 16:00 (00:00 +8)
2426
max_active_runs=1,
2527
catchup=False,
2628
)
2729
def DISCORD_PROPOSAL_REMINDER_v3():
28-
@task
29-
def SEND_PROPOSAL_SUMMARY():
30-
webhook_url = Variable.get("DISCORD_PROGRAM_REMINDER_WEBHOOK")
31-
30+
@task(outlets=[cfp_summary_asset])
31+
def fetch_proposal_summary():
3232
summary = get_proposal_summary()
3333
n_talk = summary["num_proposed_talk"]
3434
n_tutorial = summary["num_proposed_tutorial"]
35-
msg = f"目前投稿議程數: {n_talk}; 課程數: {n_tutorial}"
3635

37-
discord.send_webhook_message(
38-
webhook_url=webhook_url,
39-
username="Program talk reminder",
40-
msg=msg,
36+
yield Metadata(
37+
cfp_summary_asset,
38+
extra={
39+
"webhook_url": Variable.get("DISCORD_PROGRAM_REMINDER_WEBHOOK"),
40+
"username": "Program talk reminder",
41+
"content": f"目前投稿議程數: {n_talk}; 課程數: {n_tutorial}",
42+
},
4143
)
4244

43-
SEND_PROPOSAL_SUMMARY()
45+
fetch_proposal_summary()
4446

4547

4648
dag_obj = DISCORD_PROPOSAL_REMINDER_v3()

0 commit comments

Comments
 (0)