Skip to content

Commit d2cc8b1

Browse files
committed
dlt: Hello, World!
1 parent 66eafe1 commit d2cc8b1

File tree

12 files changed

+507
-0
lines changed

12 files changed

+507
-0
lines changed

.github/dependabot.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,11 @@ updates:
147147
schedule:
148148
interval: "daily"
149149

150+
- directory: "/framework/dlt"
151+
package-ecosystem: "pip"
152+
schedule:
153+
interval: "daily"
154+
150155
- directory: "/framework/flink/kafka-jdbcsink-java"
151156
package-ecosystem: "docker-compose"
152157
schedule:

.github/workflows/framework-dlt.yml

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
name: dlt
2+
3+
on:
4+
pull_request:
5+
paths:
6+
- '.github/workflows/framework-dlt.yml'
7+
- 'framework/dlt/**'
8+
- '/requirements.txt'
9+
push:
10+
branches: [ main ]
11+
paths:
12+
- '.github/workflows/framework-dlt.yml'
13+
- 'framework/dlt/**'
14+
- '/requirements.txt'
15+
16+
# Allow job to be triggered manually.
17+
workflow_dispatch:
18+
19+
# Run job each night after CrateDB nightly has been published.
20+
schedule:
21+
- cron: '0 3 * * *'
22+
23+
# Cancel in-progress jobs when pushing to the same branch.
24+
concurrency:
25+
cancel-in-progress: true
26+
group: ${{ github.workflow }}-${{ github.ref_name }}
27+
28+
jobs:
29+
test:
30+
name: "
31+
Python: ${{ matrix.python-version }}
32+
CrateDB: ${{ matrix.cratedb-version }}
33+
on ${{ matrix.os }}"
34+
runs-on: ${{ matrix.os }}
35+
strategy:
36+
fail-fast: false
37+
matrix:
38+
os: [ 'ubuntu-latest' ]
39+
python-version: [ '3.9', '3.13' ]
40+
cratedb-version: [ 'nightly' ]
41+
42+
services:
43+
cratedb:
44+
image: crate/crate:${{ matrix.cratedb-version }}
45+
ports:
46+
- 4200:4200
47+
- 5432:5432
48+
env:
49+
CRATE_HEAP_SIZE: 4g
50+
51+
steps:
52+
53+
- name: Acquire sources
54+
uses: actions/checkout@v4
55+
56+
- name: Set up Python
57+
uses: actions/setup-python@v5
58+
with:
59+
python-version: ${{ matrix.python-version }}
60+
architecture: x64
61+
cache: 'pip'
62+
cache-dependency-path: |
63+
requirements.txt
64+
framework/dlt/requirements.txt
65+
framework/dlt/requirements-dev.txt
66+
67+
- name: Install utilities
68+
run: |
69+
pip install -r requirements.txt
70+
71+
- name: Validate framework/dlt
72+
run: |
73+
ngr test --accept-no-venv framework/dlt

framework/dlt/.dlt/config.toml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
# Put your main configuration values here.
2+
#add_dlt_id = false
3+
#add_dlt_load_id = false
4+
5+
[runtime]
6+
7+
# The system log level of dlt.
8+
log_level="DEBUG"
9+
10+
# Use the `dlthub_telemetry` setting to enable/disable anonymous
11+
# usage data reporting, see https://dlthub.com/docs/reference/telemetry.
12+
dlthub_telemetry = false

framework/dlt/.dlt/secrets.toml

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
[destination.cratedb.credentials]
2+
# CrateDB PostgreSQL interface
3+
host = "localhost"
4+
port = 5432
5+
username = "crate"
6+
password = ""
7+
8+
[destination.sqlalchemy.credentials]
9+
# CrateDB HTTP interface
10+
# https://dlthub.com/docs/dlt-ecosystem/destinations/sqlalchemy
11+
drivername = "crate"
12+
host = "localhost"
13+
port = 4200
14+
database = ""
15+
username = "crate"
16+
password = ""
17+
18+
[sources.sql_database.credentials]
19+
# CrateDB HTTP interface
20+
# https://dlthub.com/docs/dlt-ecosystem/verified-sources/sql_database/setup
21+
drivername = "crate"
22+
host = "localhost"
23+
port = 4200
24+
database = ""
25+
username = "crate"
26+
password = ""

framework/dlt/.gitignore

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
# ignore secrets, virtual environments and typical python compilation artifacts
2+
# remark: Add it in this case, in order to provide out-of-the-box settings for localhost
3+
# secrets.toml
4+
# ignore basic python artifacts
5+
.env
6+
**/__pycache__/
7+
**/*.py[cod]
8+
**/*$py.class
9+
# ignore duckdb
10+
*.duckdb
11+
*.wal

framework/dlt/README.md

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
# dlt with CrateDB example
2+
3+
## About
4+
Demonstrate connectivity from dlt to CrateDB.
5+
6+
## Configuration
7+
Configure database connection address and credentials in `.dlt/secrets.toml`.
8+
Please make sure to use valid credentials matching your environment.
9+
10+
For [CrateDB] on localhost, a default configuration snippet looks like this.
11+
```toml
12+
[destination.cratedb.credentials]
13+
host = "localhost" # CrateDB server host.
14+
port = 5432 # CrateDB PostgreSQL TCP protocol port, default is 5432.
15+
username = "crate" # CrateDB username, default is usually "crate".
16+
password = "" # CrateDB password, if any.
17+
```
18+
19+
For [CrateDB Cloud], a configuration snippet looks like this.
20+
```toml
21+
[destination.cratedb.credentials]
22+
host = "<CLUSTERNAME>.eks1.eu-west-1.aws.cratedb.net" # CrateDB server host.
23+
port = 5432 # CrateDB PostgreSQL TCP protocol port, default is 5432.
24+
username = "admin" # CrateDB username, default is usually "admin".
25+
password = "<PASSWORD>" # CrateDB password, if any.
26+
```
27+
28+
## Usage
29+
30+
Install dependencies.
31+
```shell
32+
pip install -r requirements.txt
33+
```
34+
35+
Invoke two example pipelines.
36+
```shell
37+
python basic.py
38+
python pokemon.py
39+
```
40+
41+
## Appendix
42+
43+
### CrateDB on localhost
44+
Start a CrateDB instance on your machine.
45+
```shell
46+
docker run -it --rm \
47+
--publish=4200:4200 --publish=5432:5432 \
48+
--env=CRATE_HEAP_SIZE=2g \
49+
crate:latest -Cdiscovery.type=single-node
50+
```
51+
52+
### Sandbox
53+
Acquire `cratedb-example` repository, and set up a development sandbox.
54+
```shell
55+
git clone https://github.com/crate/cratedb-examples
56+
cd cratedb-examples
57+
python3 -m venv .venv
58+
source .venv/bin/activate
59+
pip install -r requirements.txt
60+
```
61+
62+
### Software tests
63+
Invoke the integration test cases.
64+
```shell
65+
ngr test framework/dlt
66+
```
67+
68+
69+
[CrateDB]: https://github.com/crate/crate
70+
[CrateDB Cloud]: https://console.cratedb.cloud/

framework/dlt/basic.py

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
"""The Intro Pipeline Template contains the example from the docs intro page"""
2+
import os
3+
from typing import Optional
4+
import pandas as pd
5+
import sqlalchemy as sa
6+
7+
import dlt
8+
from dlt.sources.helpers import requests
9+
10+
11+
CRATEDB_ADDRESS = os.getenv("CRATEDB_ADDRESS", "postgresql://crate:@localhost:5432/")
12+
13+
14+
def load_api_data() -> None:
15+
"""Load data from the chess api, for more complex examples use our rest_api source"""
16+
17+
# Create a dlt pipeline that will load
18+
# chess player data to the CrateDB destination
19+
pipeline = dlt.pipeline(
20+
pipeline_name="from_api",
21+
destination=dlt.destinations.cratedb(CRATEDB_ADDRESS),
22+
dataset_name="doc",
23+
)
24+
25+
# Grab some player data from Chess.com API
26+
data = []
27+
for player in ["magnuscarlsen", "rpragchess"]:
28+
response = requests.get(f"https://api.chess.com/pub/player/{player}", timeout=30)
29+
response.raise_for_status()
30+
data.append(response.json())
31+
32+
# Extract, normalize, and load the data
33+
load_info = pipeline.run(
34+
data=data,
35+
table_name="chess_players",
36+
)
37+
print(load_info) # noqa: T201
38+
39+
40+
def load_pandas_data() -> None:
41+
"""Load data from a public csv via pandas"""
42+
43+
owid_disasters_csv = (
44+
"https://raw.githubusercontent.com/owid/owid-datasets/master/datasets/"
45+
"Natural%20disasters%20from%201900%20to%202019%20-%20EMDAT%20(2020)/"
46+
"Natural%20disasters%20from%201900%20to%202019%20-%20EMDAT%20(2020).csv"
47+
)
48+
df = pd.read_csv(owid_disasters_csv)
49+
50+
pipeline = dlt.pipeline(
51+
pipeline_name="from_csv",
52+
destination=dlt.destinations.cratedb(CRATEDB_ADDRESS),
53+
dataset_name="doc",
54+
)
55+
load_info = pipeline.run(
56+
data=df,
57+
table_name="natural_disasters",
58+
)
59+
60+
print(load_info) # noqa: T201
61+
62+
63+
def load_sql_data() -> None:
64+
"""Load data from a sql database with sqlalchemy, for more complex examples use our sql_database source"""
65+
66+
# Use any SQL database supported by SQLAlchemy, below we use a public
67+
# MySQL instance to get data.
68+
# NOTE: you'll need to install pymysql with `pip install pymysql`
69+
# NOTE: loading data from public mysql instance may take several seconds
70+
# NOTE: this relies on external public database availability
71+
engine = sa.create_engine(
72+
"mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam"
73+
)
74+
75+
with engine.connect() as conn:
76+
# Select genome table, stream data in batches of 100 elements
77+
query = "SELECT * FROM genome LIMIT 1000"
78+
rows = conn.execution_options(yield_per=100).exec_driver_sql(query)
79+
80+
pipeline = dlt.pipeline(
81+
pipeline_name="from_database",
82+
destination=dlt.destinations.cratedb(CRATEDB_ADDRESS),
83+
dataset_name="doc",
84+
)
85+
86+
# Convert the rows into dictionaries on the fly with a map function
87+
load_info = pipeline.run(
88+
data=(dict(row._mapping) for row in rows),
89+
table_name="genome",
90+
)
91+
92+
print(load_info) # noqa: T201
93+
94+
95+
@dlt.resource(write_disposition="replace")
96+
def github_api_resource(api_secret_key: Optional[str] = dlt.secrets.value):
97+
from dlt.sources.helpers.rest_client import paginate
98+
from dlt.sources.helpers.rest_client.auth import BearerTokenAuth
99+
from dlt.sources.helpers.rest_client.paginators import HeaderLinkPaginator
100+
101+
url = "https://api.github.com/repos/dlt-hub/dlt/issues"
102+
103+
# Github allows both authenticated and non-authenticated requests (with low rate limits)
104+
auth = BearerTokenAuth(api_secret_key) if api_secret_key else None
105+
for page in paginate(
106+
url,
107+
auth=auth,
108+
paginator=HeaderLinkPaginator(),
109+
params={"state": "open", "per_page": "100"},
110+
):
111+
yield page
112+
113+
114+
@dlt.source
115+
def github_api_source(api_secret_key: Optional[str] = dlt.secrets.value):
116+
return github_api_resource(api_secret_key=api_secret_key)
117+
118+
119+
def load_github_data() -> None:
120+
"""Load GitHub issues data using the github_api_source."""
121+
pipeline = dlt.pipeline(
122+
pipeline_name="github_api_pipeline",
123+
destination=dlt.destinations.cratedb(CRATEDB_ADDRESS),
124+
dataset_name="doc",
125+
)
126+
load_info = pipeline.run(
127+
data=github_api_source(),
128+
table_name="github_api_data",
129+
)
130+
print(load_info) # noqa: T201
131+
132+
133+
def main():
134+
functions = [
135+
load_api_data,
136+
load_pandas_data,
137+
load_sql_data,
138+
load_github_data,
139+
]
140+
for func in functions:
141+
try:
142+
func()
143+
except Exception as e:
144+
print(f"Error in {func.__name__}: {e}") # noqa: T201
145+
146+
147+
if __name__ == "__main__":
148+
main()

0 commit comments

Comments
 (0)