Skip to content

feat(discord): move all discord message sending to dag "discord_message_notification" and use Asset to trigger #167

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ ENV PYTHONPATH="${AIRFLOW_HOME}:$PYTHONPATH"

COPY airflow.cfg ${AIRFLOW_HOME}/airflow.cfg
COPY --chown=airflow:root dags ${AIRFLOW_HOME}/dags
COPY --chown=airflow:root triggers ${AIRFLOW_HOME}/triggers
# TODO: remove this and the patch files once upgrade to 3.0.3
COPY --chown=airflow:root patch/utils.py /app/.venv/lib/python3.10/site-packages/airflow/migrations/utils.py
COPY --chown=airflow:root patch/versions /app/.venv/lib/python3.10/site-packages/airflow/migrations/versions
Expand Down
1 change: 1 addition & 0 deletions Dockerfile.test
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ ENV PYTHONPATH="${AIRFLOW_HOME}:$PYTHONPATH"

COPY airflow.cfg ${AIRFLOW_HOME}/airflow.cfg
COPY --chown=airflow:root dags ${AIRFLOW_HOME}/dags
COPY --chown=airflow:root triggers ${AIRFLOW_HOME}/triggers
# TODO: remove this and the patch files once upgrade to 3.0.3
COPY --chown=airflow:root patch/utils.py /app/.venv/lib/python3.10/site-packages/airflow/migrations/utils.py
COPY --chown=airflow:root patch/versions /app/.venv/lib/python3.10/site-packages/airflow/migrations/versions
Expand Down
92 changes: 81 additions & 11 deletions dags/app/discord.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,88 @@
import logging
from datetime import datetime

import requests
import tenacity
from airflow.providers.http.hooks.http import HttpHook
from airflow.sdk import Asset, AssetWatcher, Context, Variable, dag, task

from triggers.finance_report import FinanceReportTrigger

# get the airflow.task logger
task_logger = logging.getLogger("airflow.task")


finance_report_asset = Asset(
name="finance_report",
watchers=[
AssetWatcher(
name="finance_report_watcher",
trigger=FinanceReportTrigger(
poke_interval=86400, # 60*60*24
),
)
],
)

session = requests.session()

RETRY_ARGS = dict(
wait=tenacity.wait_random(min=1, max=10),
stop=tenacity.stop_after_attempt(10),
retry=tenacity.retry_if_exception_type(requests.exceptions.ConnectionError),
@dag(
schedule=(
finance_report_asset
| Asset(name="proposal_count")
| Asset(name="registration_statistics")
),
start_date=datetime(2025, 6, 28),
catchup=False,
max_active_runs=1,
default_args={
"owner": "Wei Lee",
"depends_on_past": False,
},
)
def discord_message_notification():
"""Send Discord Message."""

@task
def send_discord_message(**context: Context) -> None:
triggering_asset_events = context["triggering_asset_events"]
for asset_uri, asset_events in triggering_asset_events.items():
task_logger.info(f"Receive asset event from Asset uri={asset_uri}")

http_hook = HttpHook(method="POST", http_conn_id="discord_webhook")
for asset_event in asset_events: # type: ignore[attr-defined]
if asset_event.extra.get("from_trigger", False):
details = asset_event.extra["payload"]
else:
details = asset_event.extra

if not details:
task_logger.error(
f"Detail {details} cannot be empty. It's required to send discord message."
)
continue

task_logger.info("Start sending discord message")
endpoint = Variable.get(details.get("webhook_endpoint_key"))
http_hook.run_with_advanced_retry(
endpoint=endpoint,
data={
"username": details.get("username"),
"content": details.get("content"),
},
_retry_args=dict(
wait=tenacity.wait_random(min=1, max=10),
stop=tenacity.stop_after_attempt(10),
retry=tenacity.retry_if_exception_type(
requests.exceptions.ConnectionError
),
),
)
task_logger.info("Discord message sent")

send_discord_message()


dag_obj = discord_message_notification()

@tenacity.retry(**RETRY_ARGS) # type: ignore[call-overload]
def send_webhook_message(webhook_url: str, username: str, msg: str) -> None:
session.post(
webhook_url,
json={"username": username, "content": msg},
)
if __name__ == "__main__":
dag_obj.test()
63 changes: 0 additions & 63 deletions dags/app/finance_bot/dag.py

This file was deleted.

79 changes: 0 additions & 79 deletions dags/app/finance_bot/udf.py

This file was deleted.

65 changes: 24 additions & 41 deletions dags/app/proposal_reminder/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,49 +2,32 @@
Send Proposal Summary to Discord
"""

from datetime import datetime, timedelta
from airflow.sdk import Metadata, asset
from app.proposal_reminder.udf import get_proposal_summary

from airflow.sdk import Variable, dag, task
# DEFAULT_ARGS = {
# "owner": "Henry Lee",
# "depends_on_past": False,
# "start_date": datetime(2025, 2, 25),
# "end_date": datetime(2025, 4, 9),
# "retries": 2,
# "retry_delay": timedelta(minutes=5),
# }

from dags.app import discord
from dags.app.proposal_reminder.udf import get_proposal_summary

DEFAULT_ARGS = {
"owner": "Henry Lee",
"depends_on_past": False,
"start_date": datetime(2025, 2, 25),
"end_date": datetime(2025, 4, 9),
"retries": 2,
"retry_delay": timedelta(minutes=5),
}


@dag(
default_args=DEFAULT_ARGS,
@asset(
schedule="0 16 * * *", # At 16:00 (00:00 +8)
max_active_runs=1,
catchup=False,
)
def DISCORD_PROPOSAL_REMINDER_v3():
@task
def SEND_PROPOSAL_SUMMARY():
webhook_url = Variable.get("DISCORD_PROGRAM_REMINDER_WEBHOOK")

summary = get_proposal_summary()
n_talk = summary["num_proposed_talk"]
n_tutorial = summary["num_proposed_tutorial"]
msg = f"目前投稿議程數: {n_talk}; 課程數: {n_tutorial}"

discord.send_webhook_message(
webhook_url=webhook_url,
username="Program talk reminder",
msg=msg,
)

SEND_PROPOSAL_SUMMARY()


dag_obj = DISCORD_PROPOSAL_REMINDER_v3()

if __name__ == "__main__":
dag_obj.test()
def proposal_count(self):
summary = get_proposal_summary()
n_talk = summary["num_proposed_talk"]
n_tutorial = summary["num_proposed_tutorial"]

yield Metadata(
self,
extra={
"webhook_endpoint_key": "DISCORD_PROGRAM_REMINDER_WEBHOOK",
"username": "Program talk reminder",
"content": f"目前投稿議程數: {n_talk}; 課程數: {n_tutorial}",
},
)
Loading