Skip to content

Commit 819eb27

Browse files
committed
refactor(dag:*post_insights): extract bq_client as cached_property
1 parent 8b505ed commit 819eb27

File tree

2 files changed

+13
-15
lines changed

2 files changed

+13
-15
lines changed

dags/utils/posts_insights/base.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import logging
22
import os
33
from abc import ABC, abstractmethod
4+
from functools import cached_property
45

56
from google.cloud import bigquery
67

@@ -35,15 +36,18 @@ class BasePostsInsightsParser(ABC):
3536
CREATE_POSTS_TABLE_SQL: str = ""
3637
CREATE_INSIGHTS_TABLE_SQL: str = ""
3738

39+
@cached_property
40+
def bq_client(self) -> bigquery.Client:
41+
return bigquery.Client(project=os.getenv("BIGQUERY_PROJECT", ""))
42+
3843
def create_tables_if_not_exists(self) -> None:
3944
if not self.CREATE_POSTS_TABLE_SQL and not self.CREATE_INSIGHTS_TABLE_SQL:
4045
raise ValueError(
4146
"Both the SQLs to create table for posts and insights must be set"
4247
)
4348

44-
client = bigquery.Client(project=os.getenv("BIGQUERY_PROJECT", ""))
4549
for sql in [self.CREATE_POSTS_TABLE_SQL, self.CREATE_INSIGHTS_TABLE_SQL]:
46-
client.query(sql)
50+
self.bq_client.query(sql)
4751

4852
def save_posts_and_insights(self) -> None:
4953
posts = self._request_posts_data()
@@ -65,7 +69,6 @@ def _request_posts_data(self) -> list[dict]: ...
6569
def _filter_new_posts(self, posts: list[dict], last_post: dict) -> list[dict]: ...
6670

6771
def _query_last_post(self) -> dict | None:
68-
client = bigquery.Client(project=os.getenv("BIGQUERY_PROJECT"))
6972
sql = f"""
7073
SELECT
7174
created_at
@@ -75,7 +78,7 @@ def _query_last_post(self) -> dict | None:
7578
created_at DESC
7679
LIMIT 1
7780
"""
78-
result = client.query(sql)
81+
result = self.bq_client.query(sql)
7982
data = list(result)
8083
return data[0] if data else None
8184

@@ -87,7 +90,6 @@ def _dump_posts_to_bigquery(self, posts: list[dict]) -> None:
8790
logger.info("No posts to dump!")
8891
return
8992

90-
client = bigquery.Client(project=os.getenv("BIGQUERY_PROJECT"))
9193
job_config = bigquery.LoadJobConfig(
9294
schema=[
9395
bigquery.SchemaField("id", "STRING", mode="REQUIRED"),
@@ -97,7 +99,7 @@ def _dump_posts_to_bigquery(self, posts: list[dict]) -> None:
9799
write_disposition="WRITE_APPEND",
98100
)
99101
try:
100-
job = client.load_table_from_json(
102+
job = self.bq_client.load_table_from_json(
101103
posts,
102104
f"pycontw-225217.ods.{self.POST_TABLE_NAME}",
103105
job_config=job_config,
@@ -115,7 +117,6 @@ def _dump_posts_insights_to_bigquery(self, posts: list[dict]) -> None:
115117
logger.info("No post insights to dump!")
116118
return
117119

118-
client = bigquery.Client(project=os.getenv("BIGQUERY_PROJECT"))
119120
job_config = bigquery.LoadJobConfig(
120121
schema=[
121122
bigquery.SchemaField("post_id", "STRING", mode="REQUIRED"),
@@ -129,7 +130,7 @@ def _dump_posts_insights_to_bigquery(self, posts: list[dict]) -> None:
129130
write_disposition="WRITE_APPEND",
130131
)
131132
try:
132-
job = client.load_table_from_json(
133+
job = self.bq_client.load_table_from_json(
133134
posts,
134135
f"pycontw-225217.ods.{self.INSIGHT_TABLE_NAME}",
135136
job_config=job_config,

dags/utils/posts_insights/facebook.py

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import logging
2-
import os
32
from datetime import datetime
43

54
import requests
@@ -68,7 +67,6 @@ def _dump_posts_to_bigquery(self, posts: list[dict]) -> None:
6867
logger.info("No posts to dump!")
6968
return
7069

71-
client = bigquery.Client(project=os.getenv("BIGQUERY_PROJECT"))
7270
job_config = bigquery.LoadJobConfig(
7371
schema=[
7472
bigquery.SchemaField(field_name, field_type, mode="REQUIRED")
@@ -81,15 +79,15 @@ def _dump_posts_to_bigquery(self, posts: list[dict]) -> None:
8179
write_disposition="WRITE_APPEND",
8280
)
8381
try:
84-
job = client.load_table_from_json(
82+
job = self.bq_client.load_table_from_json(
8583
posts,
8684
"pycontw-225217.ods.ods_pycontw_fb_posts",
8785
job_config=job_config,
8886
)
8987
job.result()
9088
except Exception:
9189
logger.exception("Failed to dump posts to BigQuery: ")
92-
raise RuntimeError("Failed to dump posts insights to BigQuery")
90+
raise RuntimeError("Failed to dump posts to BigQuery")
9391

9492
def _process_posts_insights(self, posts: list[dict]) -> list[dict]:
9593
return [
@@ -108,7 +106,6 @@ def _dump_posts_insights_to_bigquery(self, posts: list[dict]) -> None:
108106
logger.info("No post insights to dump!")
109107
return
110108

111-
client = bigquery.Client(project=os.getenv("BIGQUERY_PROJECT"))
112109
job_config = bigquery.LoadJobConfig(
113110
schema=[
114111
bigquery.SchemaField("post_id", "STRING", mode="REQUIRED"),
@@ -120,15 +117,15 @@ def _dump_posts_insights_to_bigquery(self, posts: list[dict]) -> None:
120117
write_disposition="WRITE_APPEND",
121118
)
122119
try:
123-
job = client.load_table_from_json(
120+
job = self.bq_client.load_table_from_json(
124121
posts,
125122
"pycontw-225217.ods.ods_pycontw_fb_posts_insights",
126123
job_config=job_config,
127124
)
128125
job.result()
129126
except Exception:
130127
logger.exception("Failed to dump posts insights to BigQuery: ")
131-
raise RuntimeError("Failed to dump posts to BigQuery")
128+
raise RuntimeError("Failed to dump posts insights to BigQuery")
132129

133130

134131
def convert_fb_time(time_string: str) -> str:

0 commit comments

Comments
 (0)