From ab5eb4a0760b29d44a40b0957e6a8058da85eb64 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Sat, 28 Jun 2025 00:01:57 +0800 Subject: [PATCH 1/4] feat(discord): move all discord message sending to one dag and use asset to trigger --- dags/app/discord.py | 73 ++++++++-- dags/app/finance_bot/dag.py | 63 -------- dags/app/finance_bot/udf.py | 79 ---------- dags/app/proposal_reminder/dag.py | 29 ++-- dags/app/team_registration_bot/dag.py | 23 +-- .../app/finance_bot => triggers}/__init__.py | 0 triggers/finance_report.py | 137 ++++++++++++++++++ 7 files changed, 227 insertions(+), 177 deletions(-) delete mode 100644 dags/app/finance_bot/dag.py delete mode 100644 dags/app/finance_bot/udf.py rename {dags/app/finance_bot => triggers}/__init__.py (100%) create mode 100644 triggers/finance_report.py diff --git a/dags/app/discord.py b/dags/app/discord.py index 17aa775a..54cbce37 100644 --- a/dags/app/discord.py +++ b/dags/app/discord.py @@ -1,18 +1,69 @@ +import logging +from datetime import datetime, timedelta + import requests -import tenacity +from airflow.sdk import Asset, AssetWatcher, Context, dag, task + +from triggers.finance_report import FinanceReportTrigger -session = requests.session() +logger = logging.getLogger(__name__) -RETRY_ARGS = dict( - wait=tenacity.wait_random(min=1, max=10), - stop=tenacity.stop_after_attempt(10), - retry=tenacity.retry_if_exception_type(requests.exceptions.ConnectionError), +finance_report_asset = Asset( + name="finance_report", + watchers=[ + AssetWatcher( + name="finance_report_watcher", + trigger=FinanceReportTrigger( + poke_interval=5, # 60*60*24 + ), + ) + ], ) -@tenacity.retry(**RETRY_ARGS) # type: ignore[call-overload] -def send_webhook_message(webhook_url: str, username: str, msg: str) -> None: - session.post( - webhook_url, - json={"username": username, "content": msg}, +@dag( + schedule=( + finance_report_asset + | Asset.ref(name="CFP_summary") + | Asset.ref(name="kktix_order_report") + ), + start_date=datetime(2025, 6, 28), + catchup=False, + max_active_runs=1, + default_args={ + "owner": "Wei Lee", + "depends_on_past": False, + }, +) +def discord_message_notification(): + """Send Discord Message""" + + @task( + retries=10, + retry_delay=timedelta(seconds=10), ) + def send_discord_message(**context: Context) -> None: + triggering_asset_events = context["triggering_asset_events"] + session = requests.session() + for asset_uri, asset_event in triggering_asset_events.items(): + logger.info(f"Receive asset event from Asset uri={asset_uri}") + if asset_event.extra.get("from_trigger", False): # type: ignore[attr-defined] + details = asset_event.extra["payload"] # type: ignore[attr-defined] + else: + details = asset_event.extra # type: ignore[attr-defined] + + session.post( + details.get("webhook_url"), + json={ + "username": details.get("username"), + "content": details.get("content"), + }, + ) + + send_discord_message() + + +dag_obj = discord_message_notification() + +if __name__ == "__main__": + dag_obj.test() diff --git a/dags/app/finance_bot/dag.py b/dags/app/finance_bot/dag.py deleted file mode 100644 index fe9d2495..00000000 --- a/dags/app/finance_bot/dag.py +++ /dev/null @@ -1,63 +0,0 @@ -""" -Send Google Search Report to Discord -""" - -from datetime import datetime, timedelta - -from airflow.sdk import Variable, dag, task - -from dags.app import discord -from dags.app.finance_bot.udf import ( - df_difference, - read_bigquery_to_df, - read_google_xls_to_df, - refine_diff_df_to_string, - write_to_bigquery, -) - -DEFAULT_ARGS = { - "owner": "CHWan", - "depends_on_past": False, - "start_date": datetime(2023, 8, 27), - "retries": 2, - "retry_delay": timedelta(minutes=5), - "on_failure_callback": lambda x: "Need to send notification to Discord", -} - - -@dag( - default_args=DEFAULT_ARGS, - schedule="@daily", - max_active_runs=1, - catchup=False, -) -def DISCORD_FINANCE_REMINDER(): - @task - def REMINDER_OF_THIS_TEAM(): - # read xls from google doc to df. - df_xls = read_google_xls_to_df() - - # read bigquery to df. - df_bigquery = read_bigquery_to_df() - - # check difference between 2 df - df_diff = df_difference(df_xls, df_bigquery) - - # link to bigquery and write xls file - write_to_bigquery(df_diff) - - # push to discord - webhook_url = Variable.get("discord_data_stratagy_webhook") - msg = refine_diff_df_to_string(df_diff) - if msg != "no data": - discord.send_webhook_message( - webhook_url=webhook_url, username="財務機器人", msg=msg - ) - - REMINDER_OF_THIS_TEAM() - - -dag_obj = DISCORD_FINANCE_REMINDER() - -if __name__ == "__main__": - dag_obj.test() diff --git a/dags/app/finance_bot/udf.py b/dags/app/finance_bot/udf.py deleted file mode 100644 index 55b35633..00000000 --- a/dags/app/finance_bot/udf.py +++ /dev/null @@ -1,79 +0,0 @@ -import os - -import numpy as np -import pandas as pd -import pygsheets -from airflow.sdk import Variable -from google.cloud import bigquery - - -def df_difference(df_xls: pd.DataFrame, df_bigquery: pd.DataFrame) -> pd.DataFrame: - merged = pd.merge(df_xls, df_bigquery, how="outer", indicator=True) - return merged[merged["_merge"] == "left_only"].drop("_merge", axis=1) - - -def read_bigquery_to_df() -> pd.DataFrame: - client = bigquery.Client() - query = "SELECT * FROM `pycontw-225217.ods.pycontw_finance`" - query_job = client.query(query) - results = query_job.result() - schema = results.schema - column_names = [field.name for field in schema] - data = [list(row.values()) for row in results] - df = pd.DataFrame(data=data, columns=column_names) - return df - - -def read_google_xls_to_df() -> pd.DataFrame: - service_file = os.getenv("GOOGLE_APPLICATION_CREDENTIALS") - finance_xls_path = Variable.get("finance_xls_path") - - gc = pygsheets.authorize(service_file=service_file) - sheet = gc.open_by_url(finance_xls_path) - df = sheet.sheet1.get_as_df(include_tailing_empty=False) - df.replace("", np.nan, inplace=True) - df.dropna(inplace=True) - df = df.astype(str) - df.columns = [ - "Reason", - "Price", - "Remarks", - "Team_name", - "Details", - "To_who", - "Yes_or_No", - ] - return df - - -def write_to_bigquery(df) -> None: - project_id = "pycontw-225217" - dataset_id = "ods" - table_id = "pycontw_finance" - client = bigquery.Client(project=project_id) - table = client.dataset(dataset_id).table(table_id) - schema = [ - bigquery.SchemaField(field_name, "STRING", mode="REQUIRED") - for field_name in [ - "Reason", - "Price", - "Remarks", - "Team_name", - "Details", - "To_who", - "Yes_or_No", - ] - ] - job_config = bigquery.LoadJobConfig(schema=schema) - job = client.load_table_from_dataframe(df, table, job_config=job_config) - job.result() - - -def refine_diff_df_to_string(df: pd.DataFrame) -> str: - if df.empty: - return "no data" - - return "\n".join( - f"{row[0]}, 花費: {row[1]}, {row[3]}, {row[4]}" - for row in df.itertuples(index=False) - ) diff --git a/dags/app/proposal_reminder/dag.py b/dags/app/proposal_reminder/dag.py index 45def703..d78b2db5 100644 --- a/dags/app/proposal_reminder/dag.py +++ b/dags/app/proposal_reminder/dag.py @@ -4,10 +4,8 @@ from datetime import datetime, timedelta -from airflow.sdk import Variable, dag, task - -from dags.app import discord -from dags.app.proposal_reminder.udf import get_proposal_summary +from airflow.sdk import Asset, Metadata, Variable, dag, task +from app.proposal_reminder.udf import get_proposal_summary DEFAULT_ARGS = { "owner": "Henry Lee", @@ -19,6 +17,9 @@ } +cfp_summary_asset = Asset(name="CFP_summary") + + @dag( default_args=DEFAULT_ARGS, schedule="0 16 * * *", # At 16:00 (00:00 +8) @@ -26,22 +27,22 @@ catchup=False, ) def DISCORD_PROPOSAL_REMINDER_v3(): - @task - def SEND_PROPOSAL_SUMMARY(): - webhook_url = Variable.get("DISCORD_PROGRAM_REMINDER_WEBHOOK") - + @task(outlets=[cfp_summary_asset]) + def fetch_proposal_summary(): summary = get_proposal_summary() n_talk = summary["num_proposed_talk"] n_tutorial = summary["num_proposed_tutorial"] - msg = f"目前投稿議程數: {n_talk}; 課程數: {n_tutorial}" - discord.send_webhook_message( - webhook_url=webhook_url, - username="Program talk reminder", - msg=msg, + yield Metadata( + cfp_summary_asset, + extra={ + "webhook_url": Variable.get("DISCORD_PROGRAM_REMINDER_WEBHOOK"), + "username": "Program talk reminder", + "content": f"目前投稿議程數: {n_talk}; 課程數: {n_tutorial}", + }, ) - SEND_PROPOSAL_SUMMARY() + fetch_proposal_summary() dag_obj = DISCORD_PROPOSAL_REMINDER_v3() diff --git a/dags/app/team_registration_bot/dag.py b/dags/app/team_registration_bot/dag.py index 2e1d96aa..785fe2d8 100644 --- a/dags/app/team_registration_bot/dag.py +++ b/dags/app/team_registration_bot/dag.py @@ -4,10 +4,8 @@ from datetime import datetime, timedelta -from airflow.sdk import Variable, dag, task - -from dags.app import discord -from dags.app.team_registration_bot.udf import ( +from airflow.sdk import Asset, Metadata, Variable, dag, task +from app.team_registration_bot.udf import ( _compose_discord_msg, _get_statistics_from_bigquery, ) @@ -22,6 +20,9 @@ } +kktix_order_report_asset = Asset(name="kktix_order_report") + + @dag( default_args=DEFAULT_ARGS, schedule="@daily", @@ -29,14 +30,16 @@ catchup=False, ) def KKTIX_DISCORD_BOT_FOR_TEAM_REGISTRATION(): - @task + @task(outlets=[kktix_order_report_asset]) def LOAD_TO_DISCORD(): - webhook_url = Variable.get("discord_webhook_registration_endpoint") statistics = _get_statistics_from_bigquery() - discord.send_webhook_message( - webhook_url=webhook_url, - usernmae="KKTIX order report", - msg=_compose_discord_msg(statistics), + yield Metadata( + kktix_order_report_asset, + extra={ + "webhook_url": Variable.get("discord_webhook_registration_endpoint"), + "username": "KKTIX order report", + "content": _compose_discord_msg(statistics), + }, ) LOAD_TO_DISCORD() diff --git a/dags/app/finance_bot/__init__.py b/triggers/__init__.py similarity index 100% rename from dags/app/finance_bot/__init__.py rename to triggers/__init__.py diff --git a/triggers/finance_report.py b/triggers/finance_report.py new file mode 100644 index 00000000..7743a116 --- /dev/null +++ b/triggers/finance_report.py @@ -0,0 +1,137 @@ +from __future__ import annotations + +import asyncio +import os +from collections.abc import AsyncIterator +from datetime import datetime, timedelta +from typing import Any + +import numpy as np +import pandas as pd +import pygsheets +from airflow.sdk import Variable +from airflow.triggers.base import BaseEventTrigger, TriggerEvent +from google.cloud import bigquery + +# from datetime import datetime, timedelta +# +# DEFAULT_ARGS = { +# "owner": "CHWan", +# "depends_on_past": False, +# "start_date": datetime(2023, 8, 27), +# "retries": 2, +# "retry_delay": timedelta(minutes=5), +# "on_failure_callback": lambda x: "Need to send notification to Discord", +# } +# + + +FINANCE_REPORT_COUMNS = [ + "Reason", + "Price", + "Remarks", + "Team_name", + "Details", + "To_who", + "Yes_or_No", +] + + +def read_bigquery_to_df() -> pd.DataFrame: + client = bigquery.Client() + query = "SELECT * FROM `pycontw-225217.ods.pycontw_finance`" + query_job = client.query(query) + results = query_job.result() + schema = results.schema + column_names = [field.name for field in schema] + data = [list(row.values()) for row in results] + df = pd.DataFrame(data=data, columns=column_names) + return df + + +def read_google_xls_to_df() -> pd.DataFrame: + service_file = os.getenv("GOOGLE_APPLICATION_CREDENTIALS") + finance_xls_path = Variable.get("finance_xls_path") + + gc = pygsheets.authorize(service_file=service_file) + sheet = gc.open_by_url(finance_xls_path) + df = sheet.sheet1.get_as_df(include_tailing_empty=False) + df.replace("", np.nan, inplace=True) + df.dropna(inplace=True) + df = df.astype(str) + df.columns = FINANCE_REPORT_COUMNS + return df + + +def df_difference(df_xls: pd.DataFrame, df_bigquery: pd.DataFrame) -> pd.DataFrame: + merged = pd.merge(df_xls, df_bigquery, how="outer", indicator=True) + return merged[merged["_merge"] == "left_only"].drop("_merge", axis=1) + + +def write_to_bigquery(df: pd.DataFrame) -> None: + project_id = "pycontw-225217" + dataset_id = "ods" + table_id = "pycontw_finance" + client = bigquery.Client(project=project_id) + table = client.dataset(dataset_id).table(table_id) + schema = [ + bigquery.SchemaField(field_name, "STRING", mode="REQUIRED") + for field_name in FINANCE_REPORT_COUMNS + ] + job_config = bigquery.LoadJobConfig(schema=schema) + job = client.load_table_from_dataframe(df, table, job_config=job_config) + job.result() + + +def format_diff_df_as_message(diff_df: pd.DataFrame) -> str: + return "\n".join( + f"{row[0]}, 花費: {row[1]}, {row[3]}, {row[4]}" + for row in diff_df.itertuples(index=False) + ) + + +class FinanceReportTrigger(BaseEventTrigger): + def __init__(self, poke_interval: float) -> None: + super().__init__() + self.poke_interval = poke_interval + + def serialize(self) -> tuple[str, dict[str, Any]]: + """Serialize FinanceReportWatcher arguments and classpath.""" + return ( + "triggers.finance_report.FinanceReportTrigger", + {"poke_interval": self.poke_interval}, + ) + + async def run(self) -> AsyncIterator[TriggerEvent]: + """Loop until the finance report is updated.""" + + while True: + self.log.info("Checking Finance Report") + # TODO: improve it with asycn code + # read xls from google doc to df. + df_xls = read_google_xls_to_df() + + # read bigquery to df. + df_bigquery = read_bigquery_to_df() + + # check difference between 2 df + df_diff = df_difference(df_xls, df_bigquery) + + if not df_diff.empty: + # link to bigquery and write xls file + write_to_bigquery(df_diff) + + # push to discord + msg = format_diff_df_as_message(df_diff) + yield TriggerEvent( + { + "webhook_url": Variable.get("discord_data_stratagy_webhook"), + "username": "財務機器人", + "content": msg, + } + ) + self.log.info( + "Finish checking Finance Report. " + f"The next poke will happen in {datetime.now() + timedelta(seconds=self.poke_interval)}" + ) + await asyncio.sleep(self.poke_interval) From ea67601c926086794221526a85874ba477f6a524 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Sat, 28 Jun 2025 22:54:29 +0800 Subject: [PATCH 2/4] build(docker): add triggers into docker image and add airflow home as python path --- Dockerfile | 1 + Dockerfile.test | 1 + dags/app/discord.py | 2 +- triggers/finance_report.py | 2 +- 4 files changed, 4 insertions(+), 2 deletions(-) diff --git a/Dockerfile b/Dockerfile index 6776a42b..4b3876d3 100644 --- a/Dockerfile +++ b/Dockerfile @@ -38,6 +38,7 @@ ENV PYTHONPATH="${AIRFLOW_HOME}:$PYTHONPATH" COPY airflow.cfg ${AIRFLOW_HOME}/airflow.cfg COPY --chown=airflow:root dags ${AIRFLOW_HOME}/dags +COPY --chown=airflow:root triggers ${AIRFLOW_HOME}/triggers # TODO: remove this and the patch files once upgrade to 3.0.3 COPY --chown=airflow:root patch/utils.py /app/.venv/lib/python3.10/site-packages/airflow/migrations/utils.py COPY --chown=airflow:root patch/versions /app/.venv/lib/python3.10/site-packages/airflow/migrations/versions diff --git a/Dockerfile.test b/Dockerfile.test index af78173b..58686204 100644 --- a/Dockerfile.test +++ b/Dockerfile.test @@ -38,6 +38,7 @@ ENV PYTHONPATH="${AIRFLOW_HOME}:$PYTHONPATH" COPY airflow.cfg ${AIRFLOW_HOME}/airflow.cfg COPY --chown=airflow:root dags ${AIRFLOW_HOME}/dags +COPY --chown=airflow:root triggers ${AIRFLOW_HOME}/triggers # TODO: remove this and the patch files once upgrade to 3.0.3 COPY --chown=airflow:root patch/utils.py /app/.venv/lib/python3.10/site-packages/airflow/migrations/utils.py COPY --chown=airflow:root patch/versions /app/.venv/lib/python3.10/site-packages/airflow/migrations/versions diff --git a/dags/app/discord.py b/dags/app/discord.py index 54cbce37..489c31df 100644 --- a/dags/app/discord.py +++ b/dags/app/discord.py @@ -14,7 +14,7 @@ AssetWatcher( name="finance_report_watcher", trigger=FinanceReportTrigger( - poke_interval=5, # 60*60*24 + poke_interval=86400, # 60*60*24 ), ) ], diff --git a/triggers/finance_report.py b/triggers/finance_report.py index 7743a116..52e55a7a 100644 --- a/triggers/finance_report.py +++ b/triggers/finance_report.py @@ -105,9 +105,9 @@ def serialize(self) -> tuple[str, dict[str, Any]]: async def run(self) -> AsyncIterator[TriggerEvent]: """Loop until the finance report is updated.""" + # TODO: improve it with asycn code while True: self.log.info("Checking Finance Report") - # TODO: improve it with asycn code # read xls from google doc to df. df_xls = read_google_xls_to_df() From c3ae79f139d0680d7c8afda41694ad27820bea1c Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Sun, 6 Jul 2025 11:18:36 +0800 Subject: [PATCH 3/4] feat(dags): rewrite dags with outlet in an asset manner AIP-75 --- dags/app/discord.py | 33 ++++++++------ dags/app/proposal_reminder/dag.py | 66 ++++++++++----------------- dags/app/team_registration_bot/dag.py | 61 +++++++++---------------- 3 files changed, 65 insertions(+), 95 deletions(-) diff --git a/dags/app/discord.py b/dags/app/discord.py index 489c31df..6f4f7599 100644 --- a/dags/app/discord.py +++ b/dags/app/discord.py @@ -14,7 +14,8 @@ AssetWatcher( name="finance_report_watcher", trigger=FinanceReportTrigger( - poke_interval=86400, # 60*60*24 + # poke_interval=86400, # 60*60*24 + poke_interval=5, # 60*60*24 ), ) ], @@ -24,8 +25,8 @@ @dag( schedule=( finance_report_asset - | Asset.ref(name="CFP_summary") - | Asset.ref(name="kktix_order_report") + | Asset(name="proposal_count") + | Asset(name="registration_statistics") ), start_date=datetime(2025, 6, 28), catchup=False, @@ -45,20 +46,22 @@ def discord_message_notification(): def send_discord_message(**context: Context) -> None: triggering_asset_events = context["triggering_asset_events"] session = requests.session() - for asset_uri, asset_event in triggering_asset_events.items(): + logger.info(f"Receive asset events {triggering_asset_events}") + for asset_uri, asset_events in triggering_asset_events.items(): logger.info(f"Receive asset event from Asset uri={asset_uri}") - if asset_event.extra.get("from_trigger", False): # type: ignore[attr-defined] - details = asset_event.extra["payload"] # type: ignore[attr-defined] - else: - details = asset_event.extra # type: ignore[attr-defined] + for asset_event in asset_events: # type: ignore[attr-defined] + if asset_event.extra.get("from_trigger", False): + details = asset_event.extra["payload"] + else: + details = asset_event.extra - session.post( - details.get("webhook_url"), - json={ - "username": details.get("username"), - "content": details.get("content"), - }, - ) + session.post( + details.get("webhook_url"), + json={ + "username": details.get("username"), + "content": details.get("content"), + }, + ) send_discord_message() diff --git a/dags/app/proposal_reminder/dag.py b/dags/app/proposal_reminder/dag.py index d78b2db5..7815f6e8 100644 --- a/dags/app/proposal_reminder/dag.py +++ b/dags/app/proposal_reminder/dag.py @@ -2,50 +2,34 @@ Send Proposal Summary to Discord """ -from datetime import datetime, timedelta - -from airflow.sdk import Asset, Metadata, Variable, dag, task +from airflow.sdk import Metadata, Variable, asset from app.proposal_reminder.udf import get_proposal_summary -DEFAULT_ARGS = { - "owner": "Henry Lee", - "depends_on_past": False, - "start_date": datetime(2025, 2, 25), - "end_date": datetime(2025, 4, 9), - "retries": 2, - "retry_delay": timedelta(minutes=5), -} - +# DEFAULT_ARGS = { +# "owner": "Henry Lee", +# "depends_on_past": False, +# "start_date": datetime(2025, 2, 25), +# "end_date": datetime(2025, 4, 9), +# "retries": 2, +# "retry_delay": timedelta(minutes=5), +# } -cfp_summary_asset = Asset(name="CFP_summary") - -@dag( - default_args=DEFAULT_ARGS, +@asset( + name="proposal_count", + dag_id="proposal_count", schedule="0 16 * * *", # At 16:00 (00:00 +8) - max_active_runs=1, - catchup=False, ) -def DISCORD_PROPOSAL_REMINDER_v3(): - @task(outlets=[cfp_summary_asset]) - def fetch_proposal_summary(): - summary = get_proposal_summary() - n_talk = summary["num_proposed_talk"] - n_tutorial = summary["num_proposed_tutorial"] - - yield Metadata( - cfp_summary_asset, - extra={ - "webhook_url": Variable.get("DISCORD_PROGRAM_REMINDER_WEBHOOK"), - "username": "Program talk reminder", - "content": f"目前投稿議程數: {n_talk}; 課程數: {n_tutorial}", - }, - ) - - fetch_proposal_summary() - - -dag_obj = DISCORD_PROPOSAL_REMINDER_v3() - -if __name__ == "__main__": - dag_obj.test() +def DISCORD_PROPOSAL_REMINDER_v3(self): + summary = get_proposal_summary() + n_talk = summary["num_proposed_talk"] + n_tutorial = summary["num_proposed_tutorial"] + + yield Metadata( + self, + extra={ + "webhook_url": Variable.get("DISCORD_PROGRAM_REMINDER_WEBHOOK"), + "username": "Program talk reminder", + "content": f"目前投稿議程數: {n_talk}; 課程數: {n_tutorial}", + }, + ) diff --git a/dags/app/team_registration_bot/dag.py b/dags/app/team_registration_bot/dag.py index 785fe2d8..af531445 100644 --- a/dags/app/team_registration_bot/dag.py +++ b/dags/app/team_registration_bot/dag.py @@ -2,51 +2,34 @@ Send daily ordering metrics to discord channel """ -from datetime import datetime, timedelta - -from airflow.sdk import Asset, Metadata, Variable, dag, task +from airflow.sdk import Metadata, Variable, asset from app.team_registration_bot.udf import ( _compose_discord_msg, _get_statistics_from_bigquery, ) -DEFAULT_ARGS = { - "owner": "David Jr.", - "depends_on_past": False, - "start_date": datetime(2022, 7, 4), - "retries": 2, - "retry_delay": timedelta(minutes=5), - "on_failure_callback": lambda x: "Need to send notification to Discord!", -} - +# DEFAULT_ARGS = { +# "owner": "David Jr.", +# "depends_on_past": False, +# "start_date": datetime(2022, 7, 4), +# "retries": 2, +# "retry_delay": timedelta(minutes=5), +# "on_failure_callback": lambda x: "Need to send notification to Discord!", +# } -kktix_order_report_asset = Asset(name="kktix_order_report") - -@dag( - default_args=DEFAULT_ARGS, +@asset( + name="registration_statistics", + dag_id="registration_statistics", schedule="@daily", - max_active_runs=1, - catchup=False, ) -def KKTIX_DISCORD_BOT_FOR_TEAM_REGISTRATION(): - @task(outlets=[kktix_order_report_asset]) - def LOAD_TO_DISCORD(): - statistics = _get_statistics_from_bigquery() - yield Metadata( - kktix_order_report_asset, - extra={ - "webhook_url": Variable.get("discord_webhook_registration_endpoint"), - "username": "KKTIX order report", - "content": _compose_discord_msg(statistics), - }, - ) - - LOAD_TO_DISCORD() - - -dag_obj = KKTIX_DISCORD_BOT_FOR_TEAM_REGISTRATION() - - -if __name__ == "__main__": - dag_obj.test() +def KKTIX_DISCORD_BOT_FOR_TEAM_REGISTRATION(self): + statistics = _get_statistics_from_bigquery() + yield Metadata( + self, + extra={ + "webhook_url": Variable.get("discord_webhook_registration_endpoint"), + "username": "KKTIX order report", + "content": _compose_discord_msg(statistics), + }, + ) From eb571e41722432ef932be952ded062a1fa5896f6 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Sun, 6 Jul 2025 11:50:37 +0800 Subject: [PATCH 4/4] refactor: rewrite discord message sending through httphook --- dags/app/discord.py | 48 ++++++++++++++++++--------- dags/app/proposal_reminder/dag.py | 8 ++--- dags/app/team_registration_bot/dag.py | 21 +++++------- dags/app/team_registration_bot/udf.py | 6 ++-- triggers/finance_report.py | 2 +- 5 files changed, 48 insertions(+), 37 deletions(-) diff --git a/dags/app/discord.py b/dags/app/discord.py index 6f4f7599..f666bf16 100644 --- a/dags/app/discord.py +++ b/dags/app/discord.py @@ -1,12 +1,16 @@ import logging -from datetime import datetime, timedelta +from datetime import datetime import requests -from airflow.sdk import Asset, AssetWatcher, Context, dag, task +import tenacity +from airflow.providers.http.hooks.http import HttpHook +from airflow.sdk import Asset, AssetWatcher, Context, Variable, dag, task from triggers.finance_report import FinanceReportTrigger -logger = logging.getLogger(__name__) +# get the airflow.task logger +task_logger = logging.getLogger("airflow.task") + finance_report_asset = Asset( name="finance_report", @@ -14,8 +18,7 @@ AssetWatcher( name="finance_report_watcher", trigger=FinanceReportTrigger( - # poke_interval=86400, # 60*60*24 - poke_interval=5, # 60*60*24 + poke_interval=86400, # 60*60*24 ), ) ], @@ -37,31 +40,44 @@ }, ) def discord_message_notification(): - """Send Discord Message""" + """Send Discord Message.""" - @task( - retries=10, - retry_delay=timedelta(seconds=10), - ) + @task def send_discord_message(**context: Context) -> None: triggering_asset_events = context["triggering_asset_events"] - session = requests.session() - logger.info(f"Receive asset events {triggering_asset_events}") for asset_uri, asset_events in triggering_asset_events.items(): - logger.info(f"Receive asset event from Asset uri={asset_uri}") + task_logger.info(f"Receive asset event from Asset uri={asset_uri}") + + http_hook = HttpHook(method="POST", http_conn_id="discord_webhook") for asset_event in asset_events: # type: ignore[attr-defined] if asset_event.extra.get("from_trigger", False): details = asset_event.extra["payload"] else: details = asset_event.extra - session.post( - details.get("webhook_url"), - json={ + if not details: + task_logger.error( + f"Detail {details} cannot be empty. It's required to send discord message." + ) + continue + + task_logger.info("Start sending discord message") + endpoint = Variable.get(details.get("webhook_endpoint_key")) + http_hook.run_with_advanced_retry( + endpoint=endpoint, + data={ "username": details.get("username"), "content": details.get("content"), }, + _retry_args=dict( + wait=tenacity.wait_random(min=1, max=10), + stop=tenacity.stop_after_attempt(10), + retry=tenacity.retry_if_exception_type( + requests.exceptions.ConnectionError + ), + ), ) + task_logger.info("Discord message sent") send_discord_message() diff --git a/dags/app/proposal_reminder/dag.py b/dags/app/proposal_reminder/dag.py index 7815f6e8..faf40af2 100644 --- a/dags/app/proposal_reminder/dag.py +++ b/dags/app/proposal_reminder/dag.py @@ -2,7 +2,7 @@ Send Proposal Summary to Discord """ -from airflow.sdk import Metadata, Variable, asset +from airflow.sdk import Metadata, asset from app.proposal_reminder.udf import get_proposal_summary # DEFAULT_ARGS = { @@ -16,11 +16,9 @@ @asset( - name="proposal_count", - dag_id="proposal_count", schedule="0 16 * * *", # At 16:00 (00:00 +8) ) -def DISCORD_PROPOSAL_REMINDER_v3(self): +def proposal_count(self): summary = get_proposal_summary() n_talk = summary["num_proposed_talk"] n_tutorial = summary["num_proposed_tutorial"] @@ -28,7 +26,7 @@ def DISCORD_PROPOSAL_REMINDER_v3(self): yield Metadata( self, extra={ - "webhook_url": Variable.get("DISCORD_PROGRAM_REMINDER_WEBHOOK"), + "webhook_endpoint_key": "DISCORD_PROGRAM_REMINDER_WEBHOOK", "username": "Program talk reminder", "content": f"目前投稿議程數: {n_talk}; 課程數: {n_tutorial}", }, diff --git a/dags/app/team_registration_bot/dag.py b/dags/app/team_registration_bot/dag.py index af531445..147b4e31 100644 --- a/dags/app/team_registration_bot/dag.py +++ b/dags/app/team_registration_bot/dag.py @@ -2,10 +2,10 @@ Send daily ordering metrics to discord channel """ -from airflow.sdk import Metadata, Variable, asset +from airflow.sdk import Metadata, asset from app.team_registration_bot.udf import ( - _compose_discord_msg, - _get_statistics_from_bigquery, + compose_discord_msg, + get_statistics_from_bigquery, ) # DEFAULT_ARGS = { @@ -18,18 +18,15 @@ # } -@asset( - name="registration_statistics", - dag_id="registration_statistics", - schedule="@daily", -) -def KKTIX_DISCORD_BOT_FOR_TEAM_REGISTRATION(self): - statistics = _get_statistics_from_bigquery() +@asset(schedule="@daily") +def registration_statistics(self): + # KKTIX_DISCORD_BOT_FOR_TEAM_REGISTRATION + statistics = get_statistics_from_bigquery() yield Metadata( self, extra={ - "webhook_url": Variable.get("discord_webhook_registration_endpoint"), + "webhook_endpoint_key": "discord_webhook_registration_endpoint", "username": "KKTIX order report", - "content": _compose_discord_msg(statistics), + "content": compose_discord_msg(statistics), }, ) diff --git a/dags/app/team_registration_bot/udf.py b/dags/app/team_registration_bot/udf.py index 57d75711..8145755d 100644 --- a/dags/app/team_registration_bot/udf.py +++ b/dags/app/team_registration_bot/udf.py @@ -11,7 +11,7 @@ CLIENT = bigquery.Client(project=os.getenv("BIGQUERY_PROJECT")) -def _get_statistics_from_bigquery() -> RowIterator: +def get_statistics_from_bigquery() -> RowIterator: query_job = CLIENT.query( f""" WITH UNIQUE_RECORDS AS ( @@ -55,10 +55,10 @@ def _get_statistics_from_bigquery() -> RowIterator: } -def _compose_discord_msg(payload) -> str: +def compose_discord_msg(payload) -> str: msg = ( f"Hi 這是今天 {datetime.now().date()} 的票種統計資料," - "售票期結束後,請 follow README 的 `gcloud` 指令進去把 Airflow DAG 關掉\n\n" + "售票期結束後,請 follow README 的 `gcloud` 指令進去把 Airflow dag 關掉\n\n" ) total = 0 total_income = 0 diff --git a/triggers/finance_report.py b/triggers/finance_report.py index 52e55a7a..12445311 100644 --- a/triggers/finance_report.py +++ b/triggers/finance_report.py @@ -125,7 +125,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]: msg = format_diff_df_as_message(df_diff) yield TriggerEvent( { - "webhook_url": Variable.get("discord_data_stratagy_webhook"), + "webhook_endpoint_key": "discord_data_stratagy_webhook", "username": "財務機器人", "content": msg, }