Skip to content

feat: Add more top-level Stripe endpoints and unit tests #203

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 50 additions & 3 deletions ingestr/main_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3612,13 +3612,58 @@ def test_s3_destination():
)


@pytest.mark.parametrize("stripe_table", ["subscription", "customer", "charge"])
def test_stripe_source(stripe_table):
@pytest.mark.parametrize(
"stripe_table",
[
"subscription",
"customer",
"product",
"price",
"event",
"invoice",
"charge",
"balancetransaction",
],
)
def test_stripe_source_full_refresh(stripe_table):
# Get Stripe token from environment
stripe_token = os.environ.get("INGESTR_TEST_STRIPE_TOKEN")
if not stripe_token:
pytest.skip("INGESTR_TEST_STRIPE_TOKEN not set")

# Create test database
dbname = f"test_stripe_{stripe_table}{get_random_string(5)}.db"
abs_db_path = get_abs_path(f"./testdata/{dbname}")
rel_db_path_to_command = f"ingestr/testdata/{dbname}"
uri = f"duckdb:///{rel_db_path_to_command}"

conn = duckdb.connect(abs_db_path)

# Run ingest command
result = invoke_ingest_command(
f"stripe://{stripe_table}s?api_key={stripe_token}",
stripe_table,
uri,
f"raw.{stripe_table}s",
)

assert result.exit_code == 0

# Verify data was loaded
res = conn.sql(f"select count(*) from raw.{stripe_table}s").fetchone()
assert res[0] > 0, f"No {stripe_table} records found"

# Clean up
try:
shutil.rmtree(get_abs_path("../pipeline_data"))
os.remove(abs_db_path)
except Exception:
pass


@pytest.mark.parametrize(
"stripe_table", ["event", "invoice", "charge", "balancetransaction"]
)
def test_stripe_source_incremental(stripe_table):
# Get Stripe token from environment
stripe_token = os.environ.get("INGESTR_TEST_STRIPE_TOKEN")
if not stripe_token:
Expand All @@ -3638,6 +3683,8 @@ def test_stripe_source(stripe_table):
stripe_table,
uri,
f"raw.{stripe_table}s",
interval_start="2025-04-01",
interval_end="2025-05-30",
)

assert result.exit_code == 0
Expand Down
78 changes: 40 additions & 38 deletions ingestr/src/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -698,46 +698,47 @@ def dlt_source(self, uri: str, table: str, **kwargs):
if not api_key:
raise ValueError("api_key in the URI is required to connect to Stripe")

endpoint = None
if table == "balancetransaction":
table = "BalanceTransaction"
else:
table = table.capitalize()
table = table.lower()

if table in [
"Subscription",
"Account",
"Coupon",
"Customer",
"Product",
"Price",
"BalanceTransaction",
"Invoice",
"Event",
"Charge",
]:
endpoint = table
else:
raise ValueError(
f"Resource '{table}' is not supported for stripe source yet, if you are interested in it please create a GitHub issue at https://github.com/bruin-data/ingestr"
)

date_args = {}
if kwargs.get("interval_start"):
date_args["start_date"] = kwargs.get("interval_start")

if kwargs.get("interval_end"):
date_args["end_date"] = kwargs.get("interval_end")

from ingestr.src.stripe_analytics import stripe_source
from ingestr.src.stripe_analytics.settings import (
ENDPOINTS,
INCREMENTAL_ENDPOINTS,
)

return stripe_source(
endpoints=[
endpoint,
],
stripe_secret_key=api_key[0],
**date_args,
).with_resources(endpoint)
if table in ENDPOINTS:
endpoint = ENDPOINTS[table]
from ingestr.src.stripe_analytics import stripe_source

return stripe_source(
endpoints=[
endpoint,
],
stripe_secret_key=api_key[0],
start_date=kwargs.get("interval_start", None),
end_date=kwargs.get("interval_end", None),
).with_resources(endpoint)

elif table in INCREMENTAL_ENDPOINTS:
endpoint = INCREMENTAL_ENDPOINTS[table]
from ingestr.src.stripe_analytics import incremental_stripe_source

def nullable_date(date_str: Optional[str]):
if date_str:
return ensure_pendulum_datetime(date_str)
return None

return incremental_stripe_source(
endpoints=[
endpoint,
],
stripe_secret_key=api_key[0],
initial_start_date=nullable_date(kwargs.get("interval_start", None)),
end_date=nullable_date(kwargs.get("interval_end", None)),
).with_resources(endpoint)

raise ValueError(
f"Resource '{table}' is not supported for stripe source yet, if you are interested in it please create a GitHub issue at https://github.com/bruin-data/ingestr"
)


class FacebookAdsSource:
Expand Down Expand Up @@ -2464,6 +2465,7 @@ def dlt_source(self, uri: str, table: str, **kwargs):
sheet_id=table, # table is now a single sheet_id
)


class SolidgateSource:
def handles_incrementality(self) -> bool:
return True
Expand Down
5 changes: 2 additions & 3 deletions ingestr/src/stripe_analytics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@
from pendulum import DateTime

from .helpers import pagination, transform_date
from .settings import ENDPOINTS, INCREMENTAL_ENDPOINTS


@dlt.source(max_table_nesting=0)
def stripe_source(
endpoints: Tuple[str, ...] = ENDPOINTS,
endpoints: Tuple[str, ...],
stripe_secret_key: str = dlt.secrets.value,
start_date: Optional[DateTime] = None,
end_date: Optional[DateTime] = None,
Expand Down Expand Up @@ -53,7 +52,7 @@ def stripe_resource(

@dlt.source
def incremental_stripe_source(
endpoints: Tuple[str, ...] = INCREMENTAL_ENDPOINTS,
endpoints: Tuple[str, ...],
stripe_secret_key: str = dlt.secrets.value,
initial_start_date: Optional[DateTime] = None,
end_date: Optional[DateTime] = None,
Expand Down
30 changes: 21 additions & 9 deletions ingestr/src/stripe_analytics/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,25 @@

# the most popular endpoints
# Full list of the Stripe API endpoints you can find here: https://stripe.com/docs/api.
ENDPOINTS = (
"Subscription",
"Account",
"Coupon",
"Customer",
"Product",
"Price",
)
ENDPOINTS = {
"subscription": "Subscription",
"account": "Account",
"coupon": "Coupon",
"customer": "Customer",
"product": "Product",
"price": "Price",
"shippingrate": "ShippingRate",
"dispute": "Dispute",
"subscriptionitem": "SubscriptionItem",
"checkoutsession": "CheckoutSession",
}
# possible incremental endpoints
INCREMENTAL_ENDPOINTS = ("Event", "Invoice", "BalanceTransaction", "Charge")
INCREMENTAL_ENDPOINTS = {
"event": "Event",
"invoice": "Invoice",
"balancetransaction": "BalanceTransaction",
"charge": "Charge",
"applicationfee": "ApplicationFee",
"setupattempt": "SetupAttempt",
"creditnote": "CreditNote",
}