Skip to content

Commit ceec7a9

Browse files
committed
Ingestr: Add example using a Kafka data source
1 parent 801dde0 commit ceec7a9

File tree

10 files changed

+512
-0
lines changed

10 files changed

+512
-0
lines changed

.github/dependabot.yml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,16 @@ updates:
120120
schedule:
121121
interval: "daily"
122122

123+
- directory: "/application/ingestr"
124+
package-ecosystem: "pip"
125+
schedule:
126+
interval: "daily"
127+
128+
- directory: "/application/ingestr"
129+
package-ecosystem: "docker-compose"
130+
schedule:
131+
interval: "daily"
132+
123133
- directory: "/application/metabase"
124134
package-ecosystem: "pip"
125135
schedule:
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
name: "Ingestr"
2+
3+
on:
4+
pull_request:
5+
paths:
6+
- '.github/workflows/application-ingestr.yml'
7+
- 'application/ingestr/**'
8+
push:
9+
branches: [ main ]
10+
paths:
11+
- '.github/workflows/application-ingestr.yml'
12+
- 'application/ingestr/**'
13+
14+
# Allow job to be triggered manually.
15+
workflow_dispatch:
16+
17+
# Run job each night after CrateDB nightly has been published.
18+
schedule:
19+
- cron: '0 3 * * *'
20+
21+
# Cancel in-progress jobs when pushing to the same branch.
22+
concurrency:
23+
cancel-in-progress: true
24+
group: ${{ github.workflow }}-${{ github.ref }}
25+
26+
jobs:
27+
28+
test:
29+
runs-on: ${{ matrix.os }}
30+
31+
strategy:
32+
fail-fast: true
33+
matrix:
34+
os: [ "ubuntu-latest" ]
35+
36+
name: OS ${{ matrix.os }}
37+
steps:
38+
39+
- name: Acquire sources
40+
uses: actions/checkout@v4
41+
42+
- name: Validate application/ingestr
43+
run: |
44+
# TODO: Generalize invocation into `ngr` test runner.
45+
cd application/ingestr
46+
bash test.sh

application/ingestr/.env

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# Software component versions.
2+
CONFLUENT_VERSION=7.9.2
3+
CRATEDB_VERSION=5.10
4+
KCAT_VERSION=1.7.1
5+
6+
# Data source configuration (Kafka).
7+
PORT_KAFKA_BROKER_INTERNAL=9092
8+
PORT_KAFKA_BROKER_EXTERNAL=9094
9+
PORT_KAFKA_ZOOKEEPER=2181
10+
11+
# Data sink configuration (CrateDB).
12+
CRATEDB_HOST=${CRATEDB_HOST:-cratedb}
13+
CRATEDB_HTTP_PORT=${CRATEDB_HTTP_PORT:-4200}
14+
CRATEDB_POSTGRESQL_PORT=${CRATEDB_POSTGRESQL_PORT:-5432}
15+
CRATEDB_USERNAME=${CRATEDB_USERNAME:-crate}
16+
CRATEDB_PASSWORD=${CRATEDB_PASSWORD:-}
17+
CRATEDB_HTTP_SCHEME=${CRATEDB_HTTP_SCHEME:-http}
18+
19+
# Data sink configuration (CrateDB Cloud).
20+
# CRATEDB_HTTP_SCHEME=https
21+
# CRATEDB_HOST=example.aks1.westeurope.azure.cratedb.net
22+
# CRATEDB_USERNAME='admin'
23+
# CRATEDB_PASSWORD='<PASSWORD>'
24+
25+
# Needs to be computed here.
26+
CRATEDB_HTTP_URL="${CRATEDB_HTTP_SCHEME}://${CRATEDB_USERNAME}:${CRATEDB_PASSWORD}@${CRATEDB_HOST}:${CRATEDB_HTTP_PORT}"

application/ingestr/.gitignore

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
*.json
2+
*.ndjson
3+
*.tar.gz
4+
*.jar
5+
.venv*
6+
__pycache__
7+
!.env

application/ingestr/README.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
# Use CrateDB with ingestr
2+
3+
[ingestr] is a command-line application that allows copying data
4+
from any source into any destination database.
5+
6+
This folder includes runnable examples that use ingestr with CrateDB.
7+
They are also used as integration tests to ensure software components
8+
fit together well.
9+
10+
11+
[ingestr]: https://bruin-data.github.io/ingestr/

application/ingestr/kafka-cmd.sh

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
#!/bin/bash
2+
3+
# =====
4+
# About
5+
# =====
6+
7+
# End-to-end test feeding data through a pipeline implemented with Apache Kafka,
8+
# ingestr, and CrateDB. The data source is a file in NDJSON format, the
9+
# data sink is a database table in CrateDB.
10+
11+
12+
# ============
13+
# Main program
14+
# ============
15+
16+
source .env
17+
source util.sh
18+
set -e
19+
20+
COMPOSE_FILE=kafka-compose.yml
21+
22+
# ----------------------------
23+
# Infrastructure and resources
24+
# ----------------------------
25+
26+
function start-services() {
27+
title "Starting services"
28+
docker compose --file ${COMPOSE_FILE} up --detach
29+
}
30+
31+
function stop-services() {
32+
title "Stopping services"
33+
docker compose --file ${COMPOSE_FILE} down --remove-orphans
34+
}
35+
36+
function setup() {
37+
# title "Creating CrateDB table"
38+
# docker compose --file ${COMPOSE_FILE} run --rm create-table
39+
title "Creating Kafka topic"
40+
docker compose --file ${COMPOSE_FILE} run --rm create-topic
41+
echo "Done."
42+
}
43+
44+
function teardown() {
45+
title "Deleting Kafka topic"
46+
docker compose --file ${COMPOSE_FILE} run --rm --no-TTY delete-topic
47+
title "Dropping CrateDB table"
48+
docker compose --file ${COMPOSE_FILE} run --rm drop-table
49+
}
50+
51+
52+
# --------
53+
# Pipeline
54+
# --------
55+
56+
function invoke-job() {
57+
58+
# Invoke ingestr job.
59+
title "Invoking ingestr job"
60+
61+
uvx --python="3.12" --prerelease="allow" --with-requirements="requirements.txt" ingestr ingest --yes \
62+
--source-uri "kafka://?bootstrap_servers=localhost:${PORT_KAFKA_BROKER_EXTERNAL}&group_id=test_group" \
63+
--source-table "demo" \
64+
--dest-uri "cratedb://crate:crate@localhost:5432/?sslmode=disable" \
65+
--dest-table "doc.demo"
66+
67+
echo "Done."
68+
}
69+
70+
function feed-data() {
71+
72+
if [[ ! -f nyc-yellow-taxi-2017-subset.ndjson ]]; then
73+
74+
title "Acquiring NDJSON data"
75+
76+
# Acquire NYC Taxi 2017 dataset in JSON format (~90 MB).
77+
wget --no-clobber --continue https://gist.githubusercontent.com/kovrus/328ba1b041dfbd89e55967291ba6e074/raw/7818724cb64a5d283db7f815737c9e198a22bee4/nyc-yellow-taxi-2017.tar.gz
78+
79+
# Extract archive.
80+
tar -xvf nyc-yellow-taxi-2017.tar.gz
81+
82+
# Create a subset of the data (5000 records) for concluding the first steps.
83+
cat nyc-yellow-taxi-2017.json | head -n 5000 > nyc-yellow-taxi-2017-subset.ndjson
84+
85+
fi
86+
87+
# Publish data to the Kafka topic.
88+
title "Publishing NDJSON data to Kafka topic"
89+
cat nyc-yellow-taxi-2017-subset.ndjson | docker compose --file ${COMPOSE_FILE} run --rm --no-TTY publish-data
90+
echo "Done."
91+
92+
# Wait a bit for the data to transfer and converge successfully.
93+
sleep 3
94+
}
95+
96+
97+
# ---------
98+
# Data sink
99+
# ---------
100+
101+
function display-data() {
102+
103+
title "Displaying data in CrateDB"
104+
105+
docker compose --file ${COMPOSE_FILE} run --rm httpie \
106+
http "${CRATEDB_HTTP_URL}/_sql?pretty" stmt='REFRESH TABLE "demo";' --ignore-stdin > /dev/null
107+
108+
docker compose --file ${COMPOSE_FILE} run --rm httpie \
109+
http "${CRATEDB_HTTP_URL}/_sql?pretty" stmt='SELECT * FROM "demo" LIMIT 5;' --ignore-stdin
110+
111+
docker compose --file ${COMPOSE_FILE} run --rm httpie \
112+
http "${CRATEDB_HTTP_URL}/_sql?pretty" stmt='SELECT COUNT(*) FROM "demo";' --ignore-stdin
113+
114+
}
115+
116+
function verify-data() {
117+
title "Verifying data in CrateDB"
118+
size_reference=5000
119+
size_actual=$(
120+
docker compose --file ${COMPOSE_FILE} run --rm httpie \
121+
http "${CRATEDB_HTTP_URL}/_sql?pretty" stmt='SELECT COUNT(*) FROM "demo";' --ignore-stdin \
122+
| jq .rows[0][0]
123+
)
124+
if [[ ${size_actual} = ${size_reference} ]]; then
125+
echo -e "${BGREEN}SUCCESS: Database table contains expected number of ${size_reference} records.${NC}"
126+
else
127+
echo -e "${BRED}ERROR: Expected database table to contain ${size_reference} records, but it contains ${size_actual} records.${NC}"
128+
exit 2
129+
fi
130+
}
131+
132+
133+
# ------
134+
# Recipe
135+
# ------
136+
137+
function main() {
138+
139+
# Start services and setup resources.
140+
start-services
141+
setup
142+
143+
# Acquire data, feed data to Kafka topic, invoke import job,
144+
# and verify data has been stored into CrateDB.
145+
feed-data
146+
invoke-job
147+
display-data
148+
verify-data
149+
150+
# Clean up resources.
151+
#teardown
152+
153+
# Tear down only when `--keepalive` option has not been given.
154+
#test -z "${KEEPALIVE}" && stop-services
155+
}
156+
157+
function start_subcommand() {
158+
if test -n "${SUBCOMMAND}"; then
159+
${SUBCOMMAND}
160+
exit
161+
fi
162+
}
163+
164+
function start() {
165+
#read_options
166+
init_colors
167+
start_subcommand
168+
main
169+
}
170+
171+
start

0 commit comments

Comments
 (0)