Skip to content

Commit 9180484

Browse files
authored
Merge pull request #203 from bruin-data/feat/add-stripe-top-level-endpoints
feat: Add more top-level Stripe endpoints and unit tests
2 parents 9432d82 + 5ee4228 commit 9180484

File tree

4 files changed

+113
-53
lines changed

4 files changed

+113
-53
lines changed

ingestr/main_test.py

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3612,13 +3612,58 @@ def test_s3_destination():
36123612
)
36133613

36143614

3615-
@pytest.mark.parametrize("stripe_table", ["subscription", "customer", "charge"])
3616-
def test_stripe_source(stripe_table):
3615+
@pytest.mark.parametrize(
3616+
"stripe_table",
3617+
[
3618+
"subscription",
3619+
"customer",
3620+
"product",
3621+
"price",
3622+
"event",
3623+
"invoice",
3624+
"charge",
3625+
"balancetransaction",
3626+
],
3627+
)
3628+
def test_stripe_source_full_refresh(stripe_table):
3629+
# Get Stripe token from environment
3630+
stripe_token = os.environ.get("INGESTR_TEST_STRIPE_TOKEN")
3631+
if not stripe_token:
3632+
pytest.skip("INGESTR_TEST_STRIPE_TOKEN not set")
3633+
3634+
# Create test database
3635+
dbname = f"test_stripe_{stripe_table}{get_random_string(5)}.db"
3636+
abs_db_path = get_abs_path(f"./testdata/{dbname}")
3637+
rel_db_path_to_command = f"ingestr/testdata/{dbname}"
3638+
uri = f"duckdb:///{rel_db_path_to_command}"
3639+
3640+
conn = duckdb.connect(abs_db_path)
3641+
3642+
# Run ingest command
3643+
result = invoke_ingest_command(
3644+
f"stripe://{stripe_table}s?api_key={stripe_token}",
3645+
stripe_table,
3646+
uri,
3647+
f"raw.{stripe_table}s",
3648+
)
3649+
3650+
assert result.exit_code == 0
3651+
3652+
# Verify data was loaded
3653+
res = conn.sql(f"select count(*) from raw.{stripe_table}s").fetchone()
3654+
assert res[0] > 0, f"No {stripe_table} records found"
3655+
3656+
# Clean up
36173657
try:
3618-
shutil.rmtree(get_abs_path("../pipeline_data"))
3658+
os.remove(abs_db_path)
36193659
except Exception:
36203660
pass
36213661

3662+
3663+
@pytest.mark.parametrize(
3664+
"stripe_table", ["event", "invoice", "charge", "balancetransaction"]
3665+
)
3666+
def test_stripe_source_incremental(stripe_table):
36223667
# Get Stripe token from environment
36233668
stripe_token = os.environ.get("INGESTR_TEST_STRIPE_TOKEN")
36243669
if not stripe_token:
@@ -3638,6 +3683,8 @@ def test_stripe_source(stripe_table):
36383683
stripe_table,
36393684
uri,
36403685
f"raw.{stripe_table}s",
3686+
interval_start="2025-04-01",
3687+
interval_end="2025-05-30",
36413688
)
36423689

36433690
assert result.exit_code == 0

ingestr/src/sources.py

Lines changed: 40 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -698,46 +698,47 @@ def dlt_source(self, uri: str, table: str, **kwargs):
698698
if not api_key:
699699
raise ValueError("api_key in the URI is required to connect to Stripe")
700700

701-
endpoint = None
702-
if table == "balancetransaction":
703-
table = "BalanceTransaction"
704-
else:
705-
table = table.capitalize()
701+
table = table.lower()
706702

707-
if table in [
708-
"Subscription",
709-
"Account",
710-
"Coupon",
711-
"Customer",
712-
"Product",
713-
"Price",
714-
"BalanceTransaction",
715-
"Invoice",
716-
"Event",
717-
"Charge",
718-
]:
719-
endpoint = table
720-
else:
721-
raise ValueError(
722-
f"Resource '{table}' is not supported for stripe source yet, if you are interested in it please create a GitHub issue at https://github.com/bruin-data/ingestr"
723-
)
724-
725-
date_args = {}
726-
if kwargs.get("interval_start"):
727-
date_args["start_date"] = kwargs.get("interval_start")
728-
729-
if kwargs.get("interval_end"):
730-
date_args["end_date"] = kwargs.get("interval_end")
731-
732-
from ingestr.src.stripe_analytics import stripe_source
703+
from ingestr.src.stripe_analytics.settings import (
704+
ENDPOINTS,
705+
INCREMENTAL_ENDPOINTS,
706+
)
733707

734-
return stripe_source(
735-
endpoints=[
736-
endpoint,
737-
],
738-
stripe_secret_key=api_key[0],
739-
**date_args,
740-
).with_resources(endpoint)
708+
if table in ENDPOINTS:
709+
endpoint = ENDPOINTS[table]
710+
from ingestr.src.stripe_analytics import stripe_source
711+
712+
return stripe_source(
713+
endpoints=[
714+
endpoint,
715+
],
716+
stripe_secret_key=api_key[0],
717+
start_date=kwargs.get("interval_start", None),
718+
end_date=kwargs.get("interval_end", None),
719+
).with_resources(endpoint)
720+
721+
elif table in INCREMENTAL_ENDPOINTS:
722+
endpoint = INCREMENTAL_ENDPOINTS[table]
723+
from ingestr.src.stripe_analytics import incremental_stripe_source
724+
725+
def nullable_date(date_str: Optional[str]):
726+
if date_str:
727+
return ensure_pendulum_datetime(date_str)
728+
return None
729+
730+
return incremental_stripe_source(
731+
endpoints=[
732+
endpoint,
733+
],
734+
stripe_secret_key=api_key[0],
735+
initial_start_date=nullable_date(kwargs.get("interval_start", None)),
736+
end_date=nullable_date(kwargs.get("interval_end", None)),
737+
).with_resources(endpoint)
738+
739+
raise ValueError(
740+
f"Resource '{table}' is not supported for stripe source yet, if you are interested in it please create a GitHub issue at https://github.com/bruin-data/ingestr"
741+
)
741742

742743

743744
class FacebookAdsSource:
@@ -2464,6 +2465,7 @@ def dlt_source(self, uri: str, table: str, **kwargs):
24642465
sheet_id=table, # table is now a single sheet_id
24652466
)
24662467

2468+
24672469
class SolidgateSource:
24682470
def handles_incrementality(self) -> bool:
24692471
return True

ingestr/src/stripe_analytics/__init__.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,11 @@
88
from pendulum import DateTime
99

1010
from .helpers import pagination, transform_date
11-
from .settings import ENDPOINTS, INCREMENTAL_ENDPOINTS
1211

1312

1413
@dlt.source(max_table_nesting=0)
1514
def stripe_source(
16-
endpoints: Tuple[str, ...] = ENDPOINTS,
15+
endpoints: Tuple[str, ...],
1716
stripe_secret_key: str = dlt.secrets.value,
1817
start_date: Optional[DateTime] = None,
1918
end_date: Optional[DateTime] = None,
@@ -53,7 +52,7 @@ def stripe_resource(
5352

5453
@dlt.source
5554
def incremental_stripe_source(
56-
endpoints: Tuple[str, ...] = INCREMENTAL_ENDPOINTS,
55+
endpoints: Tuple[str, ...],
5756
stripe_secret_key: str = dlt.secrets.value,
5857
initial_start_date: Optional[DateTime] = None,
5958
end_date: Optional[DateTime] = None,

ingestr/src/stripe_analytics/settings.py

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,25 @@
22

33
# the most popular endpoints
44
# Full list of the Stripe API endpoints you can find here: https://stripe.com/docs/api.
5-
ENDPOINTS = (
6-
"Subscription",
7-
"Account",
8-
"Coupon",
9-
"Customer",
10-
"Product",
11-
"Price",
12-
)
5+
ENDPOINTS = {
6+
"subscription": "Subscription",
7+
"account": "Account",
8+
"coupon": "Coupon",
9+
"customer": "Customer",
10+
"product": "Product",
11+
"price": "Price",
12+
"shippingrate": "ShippingRate",
13+
"dispute": "Dispute",
14+
"subscriptionitem": "SubscriptionItem",
15+
"checkoutsession": "CheckoutSession",
16+
}
1317
# possible incremental endpoints
14-
INCREMENTAL_ENDPOINTS = ("Event", "Invoice", "BalanceTransaction", "Charge")
18+
INCREMENTAL_ENDPOINTS = {
19+
"event": "Event",
20+
"invoice": "Invoice",
21+
"balancetransaction": "BalanceTransaction",
22+
"charge": "Charge",
23+
"applicationfee": "ApplicationFee",
24+
"setupattempt": "SetupAttempt",
25+
"creditnote": "CreditNote",
26+
}

0 commit comments

Comments
 (0)