Skip to content

Commit 1f185eb

Browse files
committed
refactor(dag:FB_POST_INSIGHTS_V1): improve typing and simplify logic
1 parent 2a401ea commit 1f185eb

File tree

4 files changed

+24
-17
lines changed

4 files changed

+24
-17
lines changed

dags/ods/fb_post_insights/dag.py

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,14 @@
3030
def FB_POST_INSIGHTS_V1():
3131
@task
3232
def CREATE_TABLE_IF_NEEDED():
33-
client = bigquery.Client(project=os.getenv("BIGQUERY_PROJECT"))
34-
post_sql = """
33+
create_post_table_sql = """
3534
CREATE TABLE IF NOT EXISTS `pycontw-225217.ods.ods_pycontw_fb_posts` (
3635
id STRING,
3736
created_at TIMESTAMP,
3837
message STRING
3938
)
4039
"""
41-
client.query(post_sql)
42-
insights_sql = """
40+
create_insights_table_sql = """
4341
CREATE TABLE IF NOT EXISTS `pycontw-225217.ods.ods_pycontw_fb_posts_insights` (
4442
post_id STRING,
4543
query_time TIMESTAMP,
@@ -48,24 +46,29 @@ def CREATE_TABLE_IF_NEEDED():
4846
share INTEGER
4947
)
5048
"""
51-
client.query(insights_sql)
49+
50+
bg_project = os.getenv("BIGQUERY_PROJECT")
51+
client = bigquery.Client(project=bg_project)
52+
client.query(create_post_table_sql)
53+
client.query(create_insights_table_sql)
5254

5355
@task
5456
def SAVE_FB_POSTS_AND_INSIGHTS():
5557
posts = request_posts_data()
5658

5759
last_post = query_last_post()
58-
if last_post is None:
59-
new_posts = posts
60-
else:
61-
new_posts = [
60+
new_posts = (
61+
[
6262
post
6363
for post in posts
6464
if datetime.strptime(
6565
post["created_time"], "%Y-%m-%dT%H:%M:%S%z"
6666
).timestamp()
6767
> last_post["created_at"].timestamp()
6868
]
69+
if last_post is not None
70+
else posts
71+
)
6972

7073
if not dump_posts_to_bigquery(
7174
[

dags/ods/fb_post_insights/udfs.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,9 +113,12 @@ def dump_posts_to_bigquery(posts: list[dict]) -> bool:
113113
client = bigquery.Client(project=os.getenv("BIGQUERY_PROJECT"))
114114
job_config = bigquery.LoadJobConfig(
115115
schema=[
116-
bigquery.SchemaField("id", "STRING", mode="REQUIRED"),
117-
bigquery.SchemaField("created_at", "TIMESTAMP", mode="REQUIRED"),
118-
bigquery.SchemaField("message", "STRING", mode="REQUIRED"),
116+
bigquery.SchemaField(field_name, field_type, mode="REQUIRED")
117+
for field_name, field_type in [
118+
("id", "STRING"),
119+
("created_at", "TIMESTAMP"),
120+
("message", "STRING"),
121+
]
119122
],
120123
write_disposition="WRITE_APPEND",
121124
)
@@ -128,6 +131,7 @@ def dump_posts_to_bigquery(posts: list[dict]) -> bool:
128131
job.result()
129132
return True
130133
except Exception as e:
134+
# TODO: catch with more specific exception
131135
logger.error(f"Failed to dump posts to BigQuery: {e}", exc_info=True)
132136
return False
133137

dags/ods/ig_post_insights/udfs.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,13 @@
2727

2828

2929
def create_table_if_needed() -> None:
30-
client = bigquery.Client(project=os.getenv("BIGQUERY_PROJECT"))
3130
post_sql = """
3231
CREATE TABLE IF NOT EXISTS `pycontw-225217.ods.ods_pycontw_ig_posts` (
3332
id STRING,
3433
created_at TIMESTAMP,
3534
message STRING
3635
)
3736
"""
38-
client.query(post_sql)
3937
insights_sql = """
4038
CREATE TABLE IF NOT EXISTS `pycontw-225217.ods.ods_pycontw_ig_posts_insights` (
4139
post_id STRING,
@@ -47,6 +45,9 @@ def create_table_if_needed() -> None:
4745
views INTEGER
4846
)
4947
"""
48+
49+
client = bigquery.Client(project=os.getenv("BIGQUERY_PROJECT"))
50+
client.query(post_sql)
5051
client.query(insights_sql)
5152

5253

@@ -122,7 +123,6 @@ def request_posts_data() -> list[dict]:
122123
media_list = response.json()["data"]
123124

124125
media_insight_list = []
125-
126126
for media in media_list:
127127
media_insight_url = f"https://graph.facebook.com/v20.0/{media['id']}"
128128
querystring = {

dags/ods/linkedin_post_insights/udfs.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,13 @@
1010

1111

1212
def create_table_if_needed() -> None:
13-
client = bigquery.Client(project=os.getenv("BIGQUERY_PROJECT"))
1413
post_sql = """
1514
CREATE TABLE IF NOT EXISTS `pycontw-225217.ods.ods_pycontw_linkedin_posts` (
1615
id STRING,
1716
created_at TIMESTAMP,
1817
message STRING
1918
)
2019
"""
21-
client.query(post_sql)
2220
insights_sql = """
2321
CREATE TABLE IF NOT EXISTS `pycontw-225217.ods.ods_pycontw_linkedin_posts_insights` (
2422
post_id STRING,
@@ -30,6 +28,8 @@ def create_table_if_needed() -> None:
3028
views INTEGER
3129
)
3230
"""
31+
client = bigquery.Client(project=os.getenv("BIGQUERY_PROJECT"))
32+
client.query(post_sql)
3333
client.query(insights_sql)
3434

3535
# Example output from the Rapid API, not all fields will exists for a specific post

0 commit comments

Comments
 (0)