Skip to content

Commit bdb738c

Browse files
committed
Ingestr: Add example using a Kafka data source
1 parent 801dde0 commit bdb738c

File tree

11 files changed

+567
-0
lines changed

11 files changed

+567
-0
lines changed

.github/dependabot.yml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,16 @@ updates:
120120
schedule:
121121
interval: "daily"
122122

123+
- directory: "/application/ingestr"
124+
package-ecosystem: "pip"
125+
schedule:
126+
interval: "daily"
127+
128+
- directory: "/application/ingestr"
129+
package-ecosystem: "docker-compose"
130+
schedule:
131+
interval: "daily"
132+
123133
- directory: "/application/metabase"
124134
package-ecosystem: "pip"
125135
schedule:
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
name: "Ingestr"
2+
3+
on:
4+
pull_request:
5+
paths:
6+
- '.github/workflows/application-ingestr.yml'
7+
- 'application/ingestr/**'
8+
push:
9+
branches: [ main ]
10+
paths:
11+
- '.github/workflows/application-ingestr.yml'
12+
- 'application/ingestr/**'
13+
14+
# Allow job to be triggered manually.
15+
workflow_dispatch:
16+
17+
# Run job each night after CrateDB nightly has been published.
18+
schedule:
19+
- cron: '0 3 * * *'
20+
21+
# Cancel in-progress jobs when pushing to the same branch.
22+
concurrency:
23+
cancel-in-progress: true
24+
group: ${{ github.workflow }}-${{ github.ref }}
25+
26+
jobs:
27+
28+
test:
29+
runs-on: ${{ matrix.os }}
30+
31+
strategy:
32+
fail-fast: true
33+
matrix:
34+
os: [ "ubuntu-latest" ]
35+
36+
name: OS ${{ matrix.os }}
37+
steps:
38+
39+
- name: Acquire sources
40+
uses: actions/checkout@v4
41+
42+
- name: Validate application/ingestr
43+
run: |
44+
# TODO: Generalize invocation into `ngr` test runner.
45+
cd application/ingestr
46+
bash test.sh

application/ingestr/.dlt/config.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
[runtime]
2+
3+
dlthub_telemetry=false

application/ingestr/.env

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# Software component versions.
2+
CONFLUENT_VERSION=7.9.2
3+
CRATEDB_VERSION=5.10
4+
KCAT_VERSION=1.7.1
5+
6+
# Data source configuration (Kafka).
7+
PORT_KAFKA_BROKER_INTERNAL=9092
8+
PORT_KAFKA_BROKER_EXTERNAL=9094
9+
PORT_KAFKA_ZOOKEEPER=2181
10+
11+
# Data sink configuration (CrateDB).
12+
CRATEDB_HOST=${CRATEDB_HOST:-cratedb}
13+
CRATEDB_HTTP_PORT=${CRATEDB_HTTP_PORT:-4200}
14+
CRATEDB_POSTGRESQL_PORT=${CRATEDB_POSTGRESQL_PORT:-5432}
15+
CRATEDB_USERNAME=${CRATEDB_USERNAME:-crate}
16+
CRATEDB_PASSWORD=${CRATEDB_PASSWORD:-}
17+
CRATEDB_HTTP_SCHEME=${CRATEDB_HTTP_SCHEME:-http}
18+
19+
# Data sink configuration (CrateDB Cloud).
20+
# CRATEDB_HTTP_SCHEME=https
21+
# CRATEDB_HOST=example.aks1.westeurope.azure.cratedb.net
22+
# CRATEDB_USERNAME='admin'
23+
# CRATEDB_PASSWORD='<PASSWORD>'
24+
25+
# Needs to be computed here.
26+
CRATEDB_HTTP_URL="${CRATEDB_HTTP_SCHEME}://${CRATEDB_USERNAME}:${CRATEDB_PASSWORD}@${CRATEDB_HOST}:${CRATEDB_HTTP_PORT}"

application/ingestr/.gitignore

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
*.json
2+
*.ndjson
3+
*.tar.gz
4+
*.jar
5+
.venv*
6+
__pycache__
7+
!.env

application/ingestr/README.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# Use CrateDB with ingestr
2+
3+
## About
4+
5+
[ingestr] is a command-line application that allows copying data
6+
from any source into any destination database.
7+
8+
This folder includes runnable examples that use ingestr with CrateDB.
9+
They are also used as integration tests to ensure software components
10+
fit together well.
11+
12+
## Usage
13+
14+
To start cycling without tearing down the backend stack each time,
15+
use the `KEEPALIVE` environment variable.
16+
```shell
17+
export KEEPALIVE=true
18+
sh test.sh
19+
```
20+
21+
22+
[ingestr]: https://bruin-data.github.io/ingestr/

application/ingestr/kafka-cmd.sh

Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
#/usr/bin/env bash
2+
3+
# =====
4+
# About
5+
# =====
6+
7+
# End-to-end test feeding data through a pipeline implemented with Apache Kafka,
8+
# ingestr, and CrateDB. The data source is a file in NDJSON format, the
9+
# data sink is a database table in CrateDB.
10+
11+
12+
# ============
13+
# Main program
14+
# ============
15+
16+
set -eu
17+
18+
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
19+
source "${SCRIPT_DIR}/.env"
20+
source "${SCRIPT_DIR}/util.sh"
21+
22+
COMPOSE_FILE=kafka-compose.yml
23+
24+
# ----------------------------
25+
# Infrastructure and resources
26+
# ----------------------------
27+
28+
function start-services() {
29+
title "Starting services"
30+
docker compose --file ${COMPOSE_FILE} up --detach
31+
}
32+
33+
function stop-services() {
34+
title "Stopping services"
35+
docker compose --file ${COMPOSE_FILE} down --remove-orphans
36+
}
37+
38+
function setup() {
39+
delete-topic
40+
# drop-table # Please do not drop tables with ingestr
41+
# create-table # ingestr creates the table itself
42+
create-topic
43+
}
44+
45+
function teardown() {
46+
delete-topic
47+
# drop-table # Please do not drop tables with ingestr
48+
}
49+
50+
51+
# -----------
52+
# Data source
53+
# -----------
54+
55+
function create-topic() {
56+
title "Creating Kafka topic"
57+
docker compose --file ${COMPOSE_FILE} run --rm create-topic
58+
echo "Done."
59+
}
60+
61+
function delete-topic() {
62+
title "Deleting Kafka topic"
63+
docker compose --file ${COMPOSE_FILE} run --rm --no-TTY delete-topic
64+
echo "Done."
65+
}
66+
67+
68+
# --------
69+
# Pipeline
70+
# --------
71+
72+
function invoke-job() {
73+
74+
# Invoke ingestr job.
75+
title "Invoking ingestr job"
76+
77+
uvx --python="3.12" --prerelease="allow" --with-requirements="requirements.txt" ingestr ingest --yes \
78+
--source-uri "kafka://?bootstrap_servers=localhost:${PORT_KAFKA_BROKER_EXTERNAL}&group_id=test_group" \
79+
--source-table "demo" \
80+
--dest-uri "cratedb://crate:crate@localhost:5432/?sslmode=disable" \
81+
--dest-table "doc.kafka_demo"
82+
83+
echo "Done."
84+
}
85+
86+
function feed-data() {
87+
88+
if [[ ! -f nyc-yellow-taxi-2017-subset.ndjson ]]; then
89+
90+
title "Acquiring NDJSON data"
91+
92+
# Acquire NYC Taxi 2017 dataset in JSON format (~90 MB).
93+
wget --no-clobber --continue https://gist.githubusercontent.com/kovrus/328ba1b041dfbd89e55967291ba6e074/raw/7818724cb64a5d283db7f815737c9e198a22bee4/nyc-yellow-taxi-2017.tar.gz
94+
95+
# Extract archive.
96+
tar -xvf nyc-yellow-taxi-2017.tar.gz
97+
98+
# Create a subset of the data (5000 records) for concluding the first steps.
99+
cat nyc-yellow-taxi-2017.json | head -n 5000 > nyc-yellow-taxi-2017-subset.ndjson
100+
101+
fi
102+
103+
# Publish data to the Kafka topic.
104+
title "Publishing NDJSON data to Kafka topic"
105+
cat nyc-yellow-taxi-2017-subset.ndjson | docker compose --file ${COMPOSE_FILE} run --rm --no-TTY publish-data
106+
echo "Done."
107+
108+
# Wait a bit for the data to transfer and converge successfully.
109+
sleep 3
110+
}
111+
112+
113+
# ---------
114+
# Data sink
115+
# ---------
116+
117+
function create-table() {
118+
title "Creating CrateDB table"
119+
docker compose --file ${COMPOSE_FILE} run --rm create-table
120+
echo "Done."
121+
}
122+
123+
function display-data() {
124+
125+
title "Displaying data in CrateDB"
126+
127+
docker compose --file ${COMPOSE_FILE} run --rm httpie \
128+
http "${CRATEDB_HTTP_URL}/_sql?pretty" stmt='REFRESH TABLE "kafka_demo";' --ignore-stdin > /dev/null
129+
130+
docker compose --file ${COMPOSE_FILE} run --rm httpie \
131+
http "${CRATEDB_HTTP_URL}/_sql?pretty" stmt='SELECT * FROM "kafka_demo" LIMIT 5;' --ignore-stdin
132+
133+
docker compose --file ${COMPOSE_FILE} run --rm httpie \
134+
http "${CRATEDB_HTTP_URL}/_sql?pretty" stmt='SELECT COUNT(*) FROM "kafka_demo";' --ignore-stdin
135+
136+
}
137+
138+
function verify-data() {
139+
title "Verifying data in CrateDB"
140+
size_reference=5000
141+
size_actual=$(
142+
docker compose --file ${COMPOSE_FILE} run --rm httpie \
143+
http "${CRATEDB_HTTP_URL}/_sql?pretty" stmt='SELECT COUNT(*) FROM "kafka_demo";' --ignore-stdin \
144+
| jq .rows[0][0]
145+
)
146+
if [[ "${size_actual}" = "${size_reference}" ]]; then
147+
echo -e "${BGREEN}SUCCESS: Database table contains expected number of ${size_reference} records.${NC}"
148+
else
149+
echo -e "${BRED}ERROR: Expected database table to contain ${size_reference} records, but it contains ${size_actual} records.${NC}"
150+
exit 2
151+
fi
152+
}
153+
154+
function drop-table() {
155+
title "Dropping CrateDB table"
156+
docker compose --file ${COMPOSE_FILE} run --rm drop-table
157+
echo "Done."
158+
}
159+
160+
161+
# ------
162+
# Recipe
163+
# ------
164+
165+
function main() {
166+
167+
# Start services and setup resources.
168+
start-services
169+
setup
170+
171+
# Acquire data, feed data to Kafka topic, invoke import job,
172+
# and verify data has been stored into CrateDB.
173+
feed-data
174+
invoke-job
175+
display-data
176+
verify-data
177+
178+
# Clean up resources.
179+
# teardown # Do not tear down, for inspection purposes.
180+
181+
# Tear down only when `--keepalive` option has not been given.
182+
test -z "${KEEPALIVE-}" && stop-services
183+
}
184+
185+
function start_subcommand() {
186+
if test -n "${SUBCOMMAND-}"; then
187+
${SUBCOMMAND-}
188+
exit
189+
fi
190+
}
191+
192+
function start() {
193+
#read_options
194+
init_colors
195+
start_subcommand
196+
main
197+
}
198+
199+
start

0 commit comments

Comments
 (0)