Skip to content

Commit e2737e0

Browse files
authored
move to batch sink (#3)
* fix no commas, and retry 500 error * move from record sync to batch sync to allow this will allow us to only add rows when they do not already exist in the database. * add type annotation
1 parent f6b3be7 commit e2737e0

File tree

1 file changed

+60
-6
lines changed

1 file changed

+60
-6
lines changed

target_notion/sinks.py

Lines changed: 60 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,30 +6,84 @@
66
from notion_client import Client
77
from notion_client.errors import HTTPResponseError
88
from retry import retry
9-
from singer_sdk.sinks import RecordSink
9+
from singer_sdk.sinks import BatchSink
1010

1111

12-
class notionSink(RecordSink):
12+
class notionSink(BatchSink):
1313
"""notion target sink class."""
1414

15+
MAX_SIZE_DEFAULT = 100
16+
1517
def __init__(self, **kwargs) -> None: # noqa: ANN003
1618
"""Initialize the sink."""
1719
super().__init__(**kwargs)
1820
self.client = Client(auth=self.config["api_key"])
1921
self.database_schema = self.get_database_schema()
22+
self.key_property = self.key_properties[0]
23+
self.snake_key_property = snakecase(self.key_property)
24+
self.database_key_property = self.database_schema[self.snake_key_property]["name"]
25+
26+
def process_batch(self, context: dict) -> None:
27+
"""Process a batch with the given batch context.
28+
29+
This method must be overridden.
30+
31+
If :meth:`~singer_sdk.BatchSink.process_record()` is not overridden,
32+
the `context["records"]` list will contain all records from the given batch
33+
context.
34+
35+
If duplicates are merged, these can be tracked via
36+
:meth:`~singer_sdk.Sink.tally_duplicate_merged()`.
37+
38+
Args:
39+
context: Stream partition or context dictionary.
40+
"""
41+
records = [{snakecase(key): value for key, value in record.items()} for record in context["records"]]
42+
existing_pages = self.get_existing_pages(records)
43+
filtered_records = [record for record in records if record[self.snake_key_property] not in existing_pages]
44+
self.logger.info(f"Creating {len(filtered_records)}/{len(records)} pages.")
45+
for record in filtered_records:
46+
self.create_page(record)
47+
48+
def get_existing_pages(self, records: list[dict]) -> list:
49+
"""Get existing pages in the database."""
50+
_filter = {
51+
"or": [
52+
{"property": self.database_key_property, "title": {"equals": record[self.snake_key_property]}}
53+
for record in records
54+
]
55+
}
56+
has_more = True
57+
start_cursor = None
58+
existing_pages = {}
59+
while has_more:
60+
pages = self.client.databases.query(
61+
database_id=self.config["database_id"],
62+
start_cursor=start_cursor,
63+
filter_properties=[],
64+
filter=_filter,
65+
)
66+
existing_pages.update(
67+
{
68+
page["properties"][self.database_key_property]["rich_text"][0]["text"]["content"]: page["id"]
69+
for page in pages["results"]
70+
}
71+
)
72+
has_more = pages["has_more"]
73+
start_cursor = pages.get("next_cursor")
74+
return existing_pages
2075

2176
@retry(HTTPResponseError, tries=3, delay=1, backoff=4, max_delay=10)
22-
def process_record(self, record: dict, context: dict) -> None:
23-
"""Process the record.
77+
def create_page(self, record: dict) -> None:
78+
"""Create the page.
2479
2580
Args:
2681
record: Individual record in the stream.
2782
context: Stream partition or context dictionary.
2883
"""
29-
snakecase_record = {snakecase(key): value for key, value in record.items()}
3084
self.client.pages.create(
3185
parent={"database_id": self.config["database_id"]},
32-
properties=self.create_page_properties(snakecase_record),
86+
properties=self.create_page_properties(record),
3387
)
3488

3589
def get_database_schema(self) -> dict:

0 commit comments

Comments
 (0)