Skip to content

Commit 3488a6b

Browse files
committed
refactor(dag:*post_insights): extract common dump_to_bq_logic
1 parent 819eb27 commit 3488a6b

File tree

2 files changed

+34
-59
lines changed

2 files changed

+34
-59
lines changed

dags/utils/posts_insights/base.py

Lines changed: 26 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import os
33
from abc import ABC, abstractmethod
44
from functools import cached_property
5+
from typing import Literal
56

67
from google.cloud import bigquery
78

@@ -86,39 +87,24 @@ def _query_last_post(self) -> dict | None:
8687
def _process_posts(self, posts: list[dict]) -> list[dict]: ...
8788

8889
def _dump_posts_to_bigquery(self, posts: list[dict]) -> None:
89-
if not posts:
90-
logger.info("No posts to dump!")
91-
return
92-
93-
job_config = bigquery.LoadJobConfig(
94-
schema=[
90+
self._dump_to_bigquery(
91+
posts=posts,
92+
dump_type="posts",
93+
bq_schema_fields=[
9594
bigquery.SchemaField("id", "STRING", mode="REQUIRED"),
9695
bigquery.SchemaField("created_at", "TIMESTAMP", mode="REQUIRED"),
9796
bigquery.SchemaField("message", "STRING", mode="REQUIRED"),
9897
],
99-
write_disposition="WRITE_APPEND",
10098
)
101-
try:
102-
job = self.bq_client.load_table_from_json(
103-
posts,
104-
f"pycontw-225217.ods.{self.POST_TABLE_NAME}",
105-
job_config=job_config,
106-
)
107-
job.result()
108-
except Exception:
109-
logger.exception("Failed to dump posts to BigQuery: ")
110-
raise RuntimeError("Failed to dump posts insights to BigQuery")
11199

112100
@abstractmethod
113101
def _process_posts_insights(self, posts: list[dict]) -> list[dict]: ...
114102

115103
def _dump_posts_insights_to_bigquery(self, posts: list[dict]) -> None:
116-
if not posts:
117-
logger.info("No post insights to dump!")
118-
return
119-
120-
job_config = bigquery.LoadJobConfig(
121-
schema=[
104+
self._dump_to_bigquery(
105+
posts=posts,
106+
dump_type="posts insights",
107+
bq_schema_fields=[
122108
bigquery.SchemaField("post_id", "STRING", mode="REQUIRED"),
123109
bigquery.SchemaField("query_time", "TIMESTAMP", mode="REQUIRED"),
124110
bigquery.SchemaField("period", "STRING", mode="REQUIRED"),
@@ -127,6 +113,21 @@ def _dump_posts_insights_to_bigquery(self, posts: list[dict]) -> None:
127113
bigquery.SchemaField("retweet", "INTEGER", mode="NULLABLE"),
128114
bigquery.SchemaField("views", "INTEGER", mode="NULLABLE"),
129115
],
116+
)
117+
118+
def _dump_to_bigquery(
119+
self,
120+
*,
121+
posts: list[dict],
122+
dump_type: Literal["posts insights", "posts"],
123+
bq_schema_fields: list[bigquery.SchemaField],
124+
) -> None:
125+
if not posts:
126+
logger.info(f"No {dump_type} to dump!")
127+
return
128+
129+
job_config = bigquery.LoadJobConfig(
130+
schema=bq_schema_fields,
130131
write_disposition="WRITE_APPEND",
131132
)
132133
try:
@@ -137,5 +138,5 @@ def _dump_posts_insights_to_bigquery(self, posts: list[dict]) -> None:
137138
)
138139
job.result()
139140
except Exception:
140-
logger.exception("Failed to dump posts insights to BigQuery: ")
141-
raise RuntimeError("Failed to dump posts insights to BigQuery")
141+
logger.exception(f"Failed to dump {dump_type} to BigQuery: ")
142+
raise RuntimeError(f"Failed to dump {dump_type} to BigQuery")

dags/utils/posts_insights/facebook.py

Lines changed: 8 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -63,31 +63,18 @@ def _process_posts(self, posts: list[dict]) -> list[dict]:
6363
]
6464

6565
def _dump_posts_to_bigquery(self, posts: list[dict]) -> None:
66-
if not posts:
67-
logger.info("No posts to dump!")
68-
return
69-
70-
job_config = bigquery.LoadJobConfig(
71-
schema=[
66+
self._dump_to_bigquery(
67+
posts=posts,
68+
dump_type="posts",
69+
bq_schema_fields=[
7270
bigquery.SchemaField(field_name, field_type, mode="REQUIRED")
7371
for field_name, field_type in [
7472
("id", "STRING"),
7573
("created_at", "TIMESTAMP"),
7674
("message", "STRING"),
7775
]
7876
],
79-
write_disposition="WRITE_APPEND",
8077
)
81-
try:
82-
job = self.bq_client.load_table_from_json(
83-
posts,
84-
"pycontw-225217.ods.ods_pycontw_fb_posts",
85-
job_config=job_config,
86-
)
87-
job.result()
88-
except Exception:
89-
logger.exception("Failed to dump posts to BigQuery: ")
90-
raise RuntimeError("Failed to dump posts to BigQuery")
9178

9279
def _process_posts_insights(self, posts: list[dict]) -> list[dict]:
9380
return [
@@ -102,30 +89,17 @@ def _process_posts_insights(self, posts: list[dict]) -> list[dict]:
10289
]
10390

10491
def _dump_posts_insights_to_bigquery(self, posts: list[dict]) -> None:
105-
if not posts:
106-
logger.info("No post insights to dump!")
107-
return
108-
109-
job_config = bigquery.LoadJobConfig(
110-
schema=[
92+
self._dump_to_bigquery(
93+
posts=posts,
94+
dump_type="posts insights",
95+
bq_schema_fields=[
11196
bigquery.SchemaField("post_id", "STRING", mode="REQUIRED"),
11297
bigquery.SchemaField("query_time", "TIMESTAMP", mode="REQUIRED"),
11398
bigquery.SchemaField("comments", "INTEGER", mode="NULLABLE"),
11499
bigquery.SchemaField("reactions", "INTEGER", mode="NULLABLE"),
115100
bigquery.SchemaField("share", "INTEGER", mode="NULLABLE"),
116101
],
117-
write_disposition="WRITE_APPEND",
118102
)
119-
try:
120-
job = self.bq_client.load_table_from_json(
121-
posts,
122-
"pycontw-225217.ods.ods_pycontw_fb_posts_insights",
123-
job_config=job_config,
124-
)
125-
job.result()
126-
except Exception:
127-
logger.exception("Failed to dump posts insights to BigQuery: ")
128-
raise RuntimeError("Failed to dump posts insights to BigQuery")
129103

130104

131105
def convert_fb_time(time_string: str) -> str:

0 commit comments

Comments
 (0)