From 7cc2c6b05475c399f80044a2ae3c5372a577bccb Mon Sep 17 00:00:00 2001 From: tomatoprinx Date: Tue, 27 Dec 2022 01:21:21 +0800 Subject: [PATCH 1/3] add backfill and ended_event options to extract different year's kktix data --- README.md | 1 + .../kktix_ticket_orders/kktix_backfill_dag.py | 41 +++++++++++++++++++ .../kktix_ticket_orders/sqls/create_table.sql | 3 +- .../udfs/bigquery_loader.py | 2 +- .../ods/kktix_ticket_orders/udfs/kktix_api.py | 32 ++++++++------- 5 files changed, 63 insertions(+), 16 deletions(-) create mode 100644 dags/ods/kktix_ticket_orders/kktix_backfill_dag.py diff --git a/README.md b/README.md index 532e648..8c91697 100644 --- a/README.md +++ b/README.md @@ -127,6 +127,7 @@ Please use Gitlab Flow, otherwise you cannot pass dockerhub CI * KLAVIYO_LIST_ID: Create from https://www.klaviyo.com/lists * KLAVIYO_CAMPAIGN_ID: Create from https://www.klaviyo.com/campaigns * kktix_events_endpoint: url path of kktix's `hosting_events`, ask @gtb for details! + * kktix_only_not_ended_events: decide whether to only retrieve ended events ### CI/CD diff --git a/dags/ods/kktix_ticket_orders/kktix_backfill_dag.py b/dags/ods/kktix_ticket_orders/kktix_backfill_dag.py new file mode 100644 index 0000000..62e9e7f --- /dev/null +++ b/dags/ods/kktix_ticket_orders/kktix_backfill_dag.py @@ -0,0 +1,41 @@ +""" +Ingest KKTIX's data and load them to BigQuery every 5mins +""" +from datetime import datetime, timedelta + +from airflow import DAG +from airflow.operators.python_operator import PythonOperator +from ods.kktix_ticket_orders.udfs import bigquery_loader, gather_town_loader, kktix_api + +DEFAULT_ARGS = { + "owner": "bc842017@gmail.com", + "depends_on_past": False, + "start_date": datetime(2022, 10, 30), + "retries": 2, + "retry_delay": timedelta(minutes=5), + "on_failure_callback": lambda x: "Need to send notification to Discord!", +} +dag = DAG( + "KKTIX_TICKET_BACKFILL_ORDERS_V1", + default_args=DEFAULT_ARGS, + schedule_interval="*/5 * * * *", + max_active_runs=1, + catchup=True, +) +with dag: + CREATE_TABLE_IF_NEEDED = PythonOperator( + task_id="CREATE_TABLE_IF_NEEDED", + python_callable=bigquery_loader.create_table_if_needed, + ) + + GET_ATTENDEE_INFOS = PythonOperator( + task_id="GET_ATTENDEE_INFOS", + python_callable=kktix_api.main, + provide_context=True, + op_kwargs={"backfill": True} + ) + + CREATE_TABLE_IF_NEEDED >> GET_ATTENDEE_INFOS + +if __name__ == "__main__": + dag.cli() diff --git a/dags/ods/kktix_ticket_orders/sqls/create_table.sql b/dags/ods/kktix_ticket_orders/sqls/create_table.sql index a0a5394..9c7a883 100644 --- a/dags/ods/kktix_ticket_orders/sqls/create_table.sql +++ b/dags/ods/kktix_ticket_orders/sqls/create_table.sql @@ -2,5 +2,6 @@ CREATE TABLE IF NOT EXISTS `{}` ( ID INT64 NOT NULL, NAME STRING NOT NULL, - ATTENDEE_INFO STRING NOT NULL + ATTENDEE_INFO STRING NOT NULL, + REFUNDED BOOLEAN FALSE ); diff --git a/dags/ods/kktix_ticket_orders/udfs/bigquery_loader.py b/dags/ods/kktix_ticket_orders/udfs/bigquery_loader.py index f306aa0..9fdd429 100644 --- a/dags/ods/kktix_ticket_orders/udfs/bigquery_loader.py +++ b/dags/ods/kktix_ticket_orders/udfs/bigquery_loader.py @@ -3,7 +3,7 @@ from google.cloud import bigquery -TABLE = f"{os.getenv('BIGQUERY_PROJECT')}.ods.ods_kktix_attendeeId_datetime" +TABLE = f"{os.getenv('BIGQUERY_PROJECT')}.ods.ods_kktix_attendeeId_datetime_copy2" # since backfill would insert duplicate records, we need this dedupe to make it idempotent DEDUPE_SQL = f""" CREATE OR REPLACE TABLE diff --git a/dags/ods/kktix_ticket_orders/udfs/kktix_api.py b/dags/ods/kktix_ticket_orders/udfs/kktix_api.py index ea4f97e..e2775d7 100644 --- a/dags/ods/kktix_ticket_orders/udfs/kktix_api.py +++ b/dags/ods/kktix_ticket_orders/udfs/kktix_api.py @@ -27,7 +27,8 @@ def main(**context): ts_datetime_obj = parse(context["ts"]) year = ts_datetime_obj.year timestamp = ts_datetime_obj.timestamp() - event_raw_data_array = _extract(year=year, timestamp=timestamp,) + is_backfill = context["backfill"] + event_raw_data_array = _extract(year=year, timestamp=timestamp, backfill=is_backfill) transformed_event_raw_data_array = kktix_transformer.transform( copy.deepcopy(event_raw_data_array) ) @@ -40,7 +41,7 @@ def main(**context): ) -def _extract(year: int, timestamp: float) -> List[Dict]: +def _extract(year: int, timestamp: float, backfill: bool()) -> List[Dict]: """ get data from KKTIX's API 1. condition_filter_callb: use this callbacl to filter out unwanted event! @@ -54,7 +55,7 @@ def _extract(year: int, timestamp: float) -> List[Dict]: event_metadatas = get_event_metadatas(condition_filter_callback) for event_metadata in event_metadatas: event_id = event_metadata["id"] - for attendee_info in get_attendee_infos(event_id, timestamp): + for attendee_info in get_attendee_infos(event_id, timestamp, backfill): event_raw_data_array.append( { "id": event_id, @@ -65,13 +66,13 @@ def _extract(year: int, timestamp: float) -> List[Dict]: return event_raw_data_array -def get_attendee_infos(event_id: int, timestamp: float) -> List: +def get_attendee_infos(event_id: int, timestamp: float, backfill: bool()) -> List: """ it's a public wrapper for people to get attendee infos! """ attendance_book_id = _get_attendance_book_id(event_id) attendee_ids = _get_attendee_ids(event_id, attendance_book_id) - attendee_infos = _get_attendee_infos(event_id, attendee_ids, timestamp) + attendee_infos = _get_attendee_infos(event_id, attendee_ids, timestamp, backfill) return attendee_infos @@ -80,7 +81,7 @@ def get_event_metadatas(condition_filter: Callable) -> List[Dict]: Fetch all the ongoing events """ event_list_resp = HTTP_HOOK.run_with_advanced_retry( - endpoint=f"{Variable.get('kktix_events_endpoint')}?only_not_ended_event=true", + endpoint=f"{Variable.get('kktix_events_endpoint')}?only_not_ended_event={Variable.get('kktix_only_not_ended_events')}", _retry_args=RETRY_ARGS, ).json() event_metadatas: List[dict] = [] @@ -116,7 +117,7 @@ def _get_attendee_ids(event_id: int, attendance_book_id: int) -> List[int]: def _get_attendee_infos( - event_id: int, attendee_ids: List[int], timestamp: float + event_id: int, attendee_ids: List[int], timestamp: float, backfill: bool() ) -> List: """ get attendee infos, e.g. email, phonenumber, name and etc @@ -127,12 +128,15 @@ def _get_attendee_infos( endpoint=f"{Variable.get('kktix_events_endpoint')}/{event_id}/attendees/{attendee_id}", _retry_args=RETRY_ARGS, ).json() - if not attendee_info["is_paid"]: - continue - if ( - timestamp - < attendee_info["updated_at"] - < timestamp + SCHEDULE_INTERVAL_SECONDS - ): + if backfill: attendee_infos.append(attendee_info) + else: + if not attendee_info["is_paid"]: + continue + if ( + timestamp + < attendee_info["updated_at"] + < timestamp + SCHEDULE_INTERVAL_SECONDS + ): + attendee_infos.append(attendee_info) return attendee_infos From 674efe92d716129cf9d2cfaf42cc6272c029af94 Mon Sep 17 00:00:00 2001 From: tomatoprinx Date: Sun, 12 Mar 2023 23:14:04 +0800 Subject: [PATCH 2/3] resolve advice from review --- dags/ods/kktix_ticket_orders/udfs/kktix_api.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/dags/ods/kktix_ticket_orders/udfs/kktix_api.py b/dags/ods/kktix_ticket_orders/udfs/kktix_api.py index e2775d7..d9f01ff 100644 --- a/dags/ods/kktix_ticket_orders/udfs/kktix_api.py +++ b/dags/ods/kktix_ticket_orders/udfs/kktix_api.py @@ -28,7 +28,9 @@ def main(**context): year = ts_datetime_obj.year timestamp = ts_datetime_obj.timestamp() is_backfill = context["backfill"] - event_raw_data_array = _extract(year=year, timestamp=timestamp, backfill=is_backfill) + event_raw_data_array = _extract( + year=year, timestamp=timestamp, backfill=is_backfill + ) transformed_event_raw_data_array = kktix_transformer.transform( copy.deepcopy(event_raw_data_array) ) @@ -41,7 +43,7 @@ def main(**context): ) -def _extract(year: int, timestamp: float, backfill: bool()) -> List[Dict]: +def _extract(year: int, timestamp: float, backfill: bool) -> List[Dict]: """ get data from KKTIX's API 1. condition_filter_callb: use this callbacl to filter out unwanted event! @@ -66,7 +68,7 @@ def _extract(year: int, timestamp: float, backfill: bool()) -> List[Dict]: return event_raw_data_array -def get_attendee_infos(event_id: int, timestamp: float, backfill: bool()) -> List: +def get_attendee_infos(event_id: int, timestamp: float, backfill: bool) -> List: """ it's a public wrapper for people to get attendee infos! """ @@ -117,7 +119,7 @@ def _get_attendee_ids(event_id: int, attendance_book_id: int) -> List[int]: def _get_attendee_infos( - event_id: int, attendee_ids: List[int], timestamp: float, backfill: bool() + event_id: int, attendee_ids: List[int], timestamp: float, backfill: bool ) -> List: """ get attendee infos, e.g. email, phonenumber, name and etc From d027373f6998dc17061beb1e090ae81826ec150a Mon Sep 17 00:00:00 2001 From: tomatoprinx Date: Sun, 12 Mar 2023 23:26:43 +0800 Subject: [PATCH 3/3] reformat --- dags/ods/kktix_ticket_orders/kktix_backfill_dag.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dags/ods/kktix_ticket_orders/kktix_backfill_dag.py b/dags/ods/kktix_ticket_orders/kktix_backfill_dag.py index 62e9e7f..612be4d 100644 --- a/dags/ods/kktix_ticket_orders/kktix_backfill_dag.py +++ b/dags/ods/kktix_ticket_orders/kktix_backfill_dag.py @@ -5,7 +5,7 @@ from airflow import DAG from airflow.operators.python_operator import PythonOperator -from ods.kktix_ticket_orders.udfs import bigquery_loader, gather_town_loader, kktix_api +from ods.kktix_ticket_orders.udfs import bigquery_loader, kktix_api DEFAULT_ARGS = { "owner": "bc842017@gmail.com", @@ -32,7 +32,7 @@ task_id="GET_ATTENDEE_INFOS", python_callable=kktix_api.main, provide_context=True, - op_kwargs={"backfill": True} + op_kwargs={"backfill": True}, ) CREATE_TABLE_IF_NEEDED >> GET_ATTENDEE_INFOS