Skip to content

Commit e42d77c

Browse files
committed
Ingestr: Add example using a Kafka data source
1 parent 801dde0 commit e42d77c

File tree

11 files changed

+543
-0
lines changed

11 files changed

+543
-0
lines changed

.github/dependabot.yml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,16 @@ updates:
120120
schedule:
121121
interval: "daily"
122122

123+
- directory: "/application/ingestr"
124+
package-ecosystem: "pip"
125+
schedule:
126+
interval: "daily"
127+
128+
- directory: "/application/ingestr"
129+
package-ecosystem: "docker-compose"
130+
schedule:
131+
interval: "daily"
132+
123133
- directory: "/application/metabase"
124134
package-ecosystem: "pip"
125135
schedule:
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
name: "Ingestr"
2+
3+
on:
4+
pull_request:
5+
paths:
6+
- '.github/workflows/application-ingestr.yml'
7+
- 'application/ingestr/**'
8+
push:
9+
branches: [ main ]
10+
paths:
11+
- '.github/workflows/application-ingestr.yml'
12+
- 'application/ingestr/**'
13+
14+
# Allow job to be triggered manually.
15+
workflow_dispatch:
16+
17+
# Run job each night after CrateDB nightly has been published.
18+
schedule:
19+
- cron: '0 3 * * *'
20+
21+
# Cancel in-progress jobs when pushing to the same branch.
22+
concurrency:
23+
cancel-in-progress: true
24+
group: ${{ github.workflow }}-${{ github.ref }}
25+
26+
jobs:
27+
28+
test:
29+
runs-on: ${{ matrix.os }}
30+
31+
strategy:
32+
fail-fast: true
33+
matrix:
34+
os: [ "ubuntu-latest" ]
35+
36+
name: OS ${{ matrix.os }}
37+
steps:
38+
39+
- name: Acquire sources
40+
uses: actions/checkout@v4
41+
42+
- name: Validate application/ingestr
43+
run: |
44+
# TODO: Generalize invocation into `ngr` test runner.
45+
cd application/ingestr
46+
bash test.sh

application/ingestr/.dlt/config.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
[runtime]
2+
3+
dlthub_telemetry=false

application/ingestr/.env

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# Software component versions.
2+
CONFLUENT_VERSION=7.9.2
3+
CRATEDB_VERSION=5.10
4+
KCAT_VERSION=1.7.1
5+
6+
# Data source configuration (Kafka).
7+
PORT_KAFKA_BROKER_INTERNAL=9092
8+
PORT_KAFKA_BROKER_EXTERNAL=9094
9+
PORT_KAFKA_ZOOKEEPER=2181
10+
11+
# Data sink configuration (CrateDB).
12+
CRATEDB_HOST=${CRATEDB_HOST:-cratedb}
13+
CRATEDB_HTTP_PORT=${CRATEDB_HTTP_PORT:-4200}
14+
CRATEDB_POSTGRESQL_PORT=${CRATEDB_POSTGRESQL_PORT:-5432}
15+
CRATEDB_USERNAME=${CRATEDB_USERNAME:-crate}
16+
CRATEDB_PASSWORD=${CRATEDB_PASSWORD:-}
17+
CRATEDB_HTTP_SCHEME=${CRATEDB_HTTP_SCHEME:-http}
18+
19+
# Data sink configuration (CrateDB Cloud).
20+
# CRATEDB_HTTP_SCHEME=https
21+
# CRATEDB_HOST=example.aks1.westeurope.azure.cratedb.net
22+
# CRATEDB_USERNAME='admin'
23+
# CRATEDB_PASSWORD='<PASSWORD>'
24+
25+
# Needs to be computed here.
26+
CRATEDB_HTTP_URL="${CRATEDB_HTTP_SCHEME}://${CRATEDB_USERNAME}:${CRATEDB_PASSWORD}@${CRATEDB_HOST}:${CRATEDB_HTTP_PORT}"

application/ingestr/.gitignore

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
*.json
2+
*.ndjson
3+
*.tar.gz
4+
*.jar
5+
.venv*
6+
__pycache__
7+
!.env

application/ingestr/README.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
# Use CrateDB with ingestr
2+
3+
[ingestr] is a command-line application that allows copying data
4+
from any source into any destination database.
5+
6+
This folder includes runnable examples that use ingestr with CrateDB.
7+
They are also used as integration tests to ensure software components
8+
fit together well.
9+
10+
11+
[ingestr]: https://bruin-data.github.io/ingestr/

application/ingestr/kafka-cmd.sh

Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
#!/bin/bash
2+
3+
# =====
4+
# About
5+
# =====
6+
7+
# End-to-end test feeding data through a pipeline implemented with Apache Kafka,
8+
# ingestr, and CrateDB. The data source is a file in NDJSON format, the
9+
# data sink is a database table in CrateDB.
10+
11+
12+
# ============
13+
# Main program
14+
# ============
15+
16+
source .env
17+
source util.sh
18+
set -e
19+
20+
COMPOSE_FILE=kafka-compose.yml
21+
22+
# ----------------------------
23+
# Infrastructure and resources
24+
# ----------------------------
25+
26+
function start-services() {
27+
title "Starting services"
28+
docker compose --file ${COMPOSE_FILE} up --detach
29+
}
30+
31+
function stop-services() {
32+
title "Stopping services"
33+
docker compose --file ${COMPOSE_FILE} down --remove-orphans
34+
}
35+
36+
function setup() {
37+
delete-topic
38+
# drop-table # Please do not drop tables with ingestr
39+
# create-table # ingestr creates the table itself
40+
create-topic
41+
}
42+
43+
function teardown() {
44+
delete-topic
45+
# drop-table # Please do not drop tables with ingestr
46+
}
47+
48+
49+
# -----------
50+
# Data source
51+
# -----------
52+
53+
function create-topic() {
54+
title "Creating Kafka topic"
55+
docker compose --file ${COMPOSE_FILE} run --rm create-topic
56+
echo "Done."
57+
}
58+
59+
function delete-topic() {
60+
title "Deleting Kafka topic"
61+
docker compose --file ${COMPOSE_FILE} run --rm --no-TTY delete-topic
62+
echo "Done."
63+
}
64+
65+
66+
# --------
67+
# Pipeline
68+
# --------
69+
70+
function invoke-job() {
71+
72+
# Invoke ingestr job.
73+
title "Invoking ingestr job"
74+
75+
uvx --python="3.12" --prerelease="allow" --with-requirements="requirements.txt" ingestr ingest --yes \
76+
--source-uri "kafka://?bootstrap_servers=localhost:${PORT_KAFKA_BROKER_EXTERNAL}&group_id=test_group" \
77+
--source-table "demo" \
78+
--dest-uri "cratedb://crate:crate@localhost:5432/?sslmode=disable" \
79+
--dest-table "doc.kafka_demo"
80+
81+
echo "Done."
82+
}
83+
84+
function feed-data() {
85+
86+
if [[ ! -f nyc-yellow-taxi-2017-subset.ndjson ]]; then
87+
88+
title "Acquiring NDJSON data"
89+
90+
# Acquire NYC Taxi 2017 dataset in JSON format (~90 MB).
91+
wget --no-clobber --continue https://gist.githubusercontent.com/kovrus/328ba1b041dfbd89e55967291ba6e074/raw/7818724cb64a5d283db7f815737c9e198a22bee4/nyc-yellow-taxi-2017.tar.gz
92+
93+
# Extract archive.
94+
tar -xvf nyc-yellow-taxi-2017.tar.gz
95+
96+
# Create a subset of the data (5000 records) for concluding the first steps.
97+
cat nyc-yellow-taxi-2017.json | head -n 5000 > nyc-yellow-taxi-2017-subset.ndjson
98+
99+
fi
100+
101+
# Publish data to the Kafka topic.
102+
title "Publishing NDJSON data to Kafka topic"
103+
cat nyc-yellow-taxi-2017-subset.ndjson | docker compose --file ${COMPOSE_FILE} run --rm --no-TTY publish-data
104+
echo "Done."
105+
106+
# Wait a bit for the data to transfer and converge successfully.
107+
sleep 3
108+
}
109+
110+
111+
# ---------
112+
# Data sink
113+
# ---------
114+
115+
function create-table() {
116+
title "Creating CrateDB table"
117+
docker compose --file ${COMPOSE_FILE} run --rm create-table
118+
echo "Done."
119+
}
120+
121+
function display-data() {
122+
123+
title "Displaying data in CrateDB"
124+
125+
docker compose --file ${COMPOSE_FILE} run --rm httpie \
126+
http "${CRATEDB_HTTP_URL}/_sql?pretty" stmt='REFRESH TABLE "kafka_demo";' --ignore-stdin > /dev/null
127+
128+
docker compose --file ${COMPOSE_FILE} run --rm httpie \
129+
http "${CRATEDB_HTTP_URL}/_sql?pretty" stmt='SELECT * FROM "kafka_demo" LIMIT 5;' --ignore-stdin
130+
131+
docker compose --file ${COMPOSE_FILE} run --rm httpie \
132+
http "${CRATEDB_HTTP_URL}/_sql?pretty" stmt='SELECT COUNT(*) FROM "kafka_demo";' --ignore-stdin
133+
134+
}
135+
136+
function verify-data() {
137+
title "Verifying data in CrateDB"
138+
size_reference=5000
139+
size_actual=$(
140+
docker compose --file ${COMPOSE_FILE} run --rm httpie \
141+
http "${CRATEDB_HTTP_URL}/_sql?pretty" stmt='SELECT COUNT(*) FROM "kafka_demo";' --ignore-stdin \
142+
| jq .rows[0][0]
143+
)
144+
if [[ ${size_actual} = ${size_reference} ]]; then
145+
echo -e "${BGREEN}SUCCESS: Database table contains expected number of ${size_reference} records.${NC}"
146+
else
147+
echo -e "${BRED}ERROR: Expected database table to contain ${size_reference} records, but it contains ${size_actual} records.${NC}"
148+
exit 2
149+
fi
150+
}
151+
152+
function drop-table() {
153+
title "Dropping CrateDB table"
154+
docker compose --file ${COMPOSE_FILE} run --rm drop-table
155+
echo "Done."
156+
}
157+
158+
159+
# ------
160+
# Recipe
161+
# ------
162+
163+
function main() {
164+
165+
# Start services and setup resources.
166+
start-services
167+
setup
168+
169+
# Acquire data, feed data to Kafka topic, invoke import job,
170+
# and verify data has been stored into CrateDB.
171+
feed-data
172+
invoke-job
173+
display-data
174+
verify-data
175+
176+
# Clean up resources.
177+
#teardown
178+
179+
# Tear down only when `--keepalive` option has not been given.
180+
#test -z "${KEEPALIVE}" && stop-services
181+
}
182+
183+
function start_subcommand() {
184+
if test -n "${SUBCOMMAND}"; then
185+
${SUBCOMMAND}
186+
exit
187+
fi
188+
}
189+
190+
function start() {
191+
#read_options
192+
init_colors
193+
start_subcommand
194+
main
195+
}
196+
197+
start

0 commit comments

Comments
 (0)