Skip to content

Commit 8b505ed

Browse files
committed
refactor(dag:*post_insights): deduplicate common code with *PostsInsightsParser
1 parent 1f185eb commit 8b505ed

File tree

14 files changed

+624
-869
lines changed

14 files changed

+624
-869
lines changed

dags/ods/fb_post_insights/dag.py

Lines changed: 3 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,7 @@
1-
import os
21
from datetime import datetime, timedelta
32

43
from airflow.decorators import dag, task
5-
from google.cloud import bigquery
6-
from ods.fb_post_insights.udfs import (
7-
convert_fb_time,
8-
dump_posts_insights_to_bigquery,
9-
dump_posts_to_bigquery,
10-
query_last_post,
11-
request_posts_data,
12-
)
4+
from utils.posts_insights.facebook import FacebookPostsInsightsParser
135

146
DEFAULT_ARGS = {
157
"owner": "CHWan",
@@ -30,71 +22,11 @@
3022
def FB_POST_INSIGHTS_V1():
3123
@task
3224
def CREATE_TABLE_IF_NEEDED():
33-
create_post_table_sql = """
34-
CREATE TABLE IF NOT EXISTS `pycontw-225217.ods.ods_pycontw_fb_posts` (
35-
id STRING,
36-
created_at TIMESTAMP,
37-
message STRING
38-
)
39-
"""
40-
create_insights_table_sql = """
41-
CREATE TABLE IF NOT EXISTS `pycontw-225217.ods.ods_pycontw_fb_posts_insights` (
42-
post_id STRING,
43-
query_time TIMESTAMP,
44-
comments INTEGER,
45-
reactions INTEGER,
46-
share INTEGER
47-
)
48-
"""
49-
50-
bg_project = os.getenv("BIGQUERY_PROJECT")
51-
client = bigquery.Client(project=bg_project)
52-
client.query(create_post_table_sql)
53-
client.query(create_insights_table_sql)
25+
FacebookPostsInsightsParser().create_tables_if_not_exists()
5426

5527
@task
5628
def SAVE_FB_POSTS_AND_INSIGHTS():
57-
posts = request_posts_data()
58-
59-
last_post = query_last_post()
60-
new_posts = (
61-
[
62-
post
63-
for post in posts
64-
if datetime.strptime(
65-
post["created_time"], "%Y-%m-%dT%H:%M:%S%z"
66-
).timestamp()
67-
> last_post["created_at"].timestamp()
68-
]
69-
if last_post is not None
70-
else posts
71-
)
72-
73-
if not dump_posts_to_bigquery(
74-
[
75-
{
76-
"id": post["id"],
77-
"created_at": convert_fb_time(post["created_time"]),
78-
"message": post.get("message", "No message found"),
79-
}
80-
for post in new_posts
81-
]
82-
):
83-
raise RuntimeError("Failed to dump posts to BigQuery")
84-
85-
if not dump_posts_insights_to_bigquery(
86-
[
87-
{
88-
"post_id": post["id"],
89-
"query_time": datetime.now().timestamp(),
90-
"comments": post["comments"]["summary"]["total_count"],
91-
"reactions": post["reactions"]["summary"]["total_count"],
92-
"share": post.get("shares", {}).get("count", 0),
93-
}
94-
for post in posts
95-
]
96-
):
97-
raise RuntimeError("Failed to dump posts insights to BigQuery")
29+
FacebookPostsInsightsParser().save_posts_and_insights()
9830

9931
CREATE_TABLE_IF_NEEDED() >> SAVE_FB_POSTS_AND_INSIGHTS()
10032

dags/ods/fb_post_insights/udfs.py

Lines changed: 0 additions & 173 deletions
This file was deleted.

dags/ods/ig_post_insights/dags.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from datetime import datetime, timedelta
22

33
from airflow.decorators import dag, task
4-
from ods.ig_post_insights import udfs
4+
from utils.posts_insights.instagram import InstagramPostsInsightsParser
55

66
DEFAULT_ARGS = {
77
"owner": "Angus Yang",
@@ -22,13 +22,13 @@
2222
def IG_POST_INSIGHTS_V1():
2323
@task
2424
def CREATE_TABLE_IF_NEEDED():
25-
udfs.create_table_if_needed()
25+
InstagramPostsInsightsParser().create_tables_if_not_exists()
2626

2727
@task
28-
def SAVE_TWITTER_POSTS_AND_INSIGHTS():
29-
udfs.save_posts_and_insights()
28+
def SAVE_IG_POSTS_AND_INSIGHTS():
29+
InstagramPostsInsightsParser().save_posts_and_insights()
3030

31-
CREATE_TABLE_IF_NEEDED() >> SAVE_TWITTER_POSTS_AND_INSIGHTS()
31+
CREATE_TABLE_IF_NEEDED() >> SAVE_IG_POSTS_AND_INSIGHTS()
3232

3333

3434
dag_obj = IG_POST_INSIGHTS_V1()

0 commit comments

Comments
 (0)