-
Notifications
You must be signed in to change notification settings - Fork 9
Ingestr: Add runnable example loading from Kafka #1013
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
name: "Ingestr" | ||
|
||
on: | ||
pull_request: | ||
paths: | ||
- '.github/workflows/application-ingestr.yml' | ||
- 'application/ingestr/**' | ||
push: | ||
branches: [ main ] | ||
paths: | ||
- '.github/workflows/application-ingestr.yml' | ||
- 'application/ingestr/**' | ||
|
||
# Allow job to be triggered manually. | ||
workflow_dispatch: | ||
|
||
# Run job each night after CrateDB nightly has been published. | ||
schedule: | ||
- cron: '0 3 * * *' | ||
|
||
# Cancel in-progress jobs when pushing to the same branch. | ||
concurrency: | ||
cancel-in-progress: true | ||
group: ${{ github.workflow }}-${{ github.ref }} | ||
|
||
jobs: | ||
|
||
test: | ||
runs-on: ${{ matrix.os }} | ||
|
||
strategy: | ||
fail-fast: true | ||
matrix: | ||
os: [ "ubuntu-latest" ] | ||
|
||
name: OS ${{ matrix.os }} | ||
steps: | ||
|
||
- name: Acquire sources | ||
uses: actions/checkout@v4 | ||
|
||
- name: Validate application/ingestr | ||
run: | | ||
# TODO: Generalize invocation into `ngr` test runner. | ||
cd application/ingestr | ||
bash test.sh |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
[runtime] | ||
|
||
dlthub_telemetry=false |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
# Software component versions. | ||
CONFLUENT_VERSION=7.9.2 | ||
CRATEDB_VERSION=5.10 | ||
KCAT_VERSION=1.7.1 | ||
|
||
# Data source configuration (Kafka). | ||
PORT_KAFKA_BROKER_INTERNAL=9092 | ||
PORT_KAFKA_BROKER_EXTERNAL=9094 | ||
PORT_KAFKA_ZOOKEEPER=2181 | ||
|
||
# Data sink configuration (CrateDB). | ||
CRATEDB_HOST=${CRATEDB_HOST:-cratedb} | ||
CRATEDB_HTTP_PORT=${CRATEDB_HTTP_PORT:-4200} | ||
CRATEDB_POSTGRESQL_PORT=${CRATEDB_POSTGRESQL_PORT:-5432} | ||
CRATEDB_USERNAME=${CRATEDB_USERNAME:-crate} | ||
CRATEDB_PASSWORD=${CRATEDB_PASSWORD:-} | ||
CRATEDB_HTTP_SCHEME=${CRATEDB_HTTP_SCHEME:-http} | ||
|
||
# Data sink configuration (CrateDB Cloud). | ||
# CRATEDB_HTTP_SCHEME=https | ||
# CRATEDB_HOST=example.aks1.westeurope.azure.cratedb.net | ||
# CRATEDB_USERNAME='admin' | ||
# CRATEDB_PASSWORD='<PASSWORD>' | ||
|
||
# Needs to be computed here. | ||
CRATEDB_HTTP_URL="${CRATEDB_HTTP_SCHEME}://${CRATEDB_USERNAME}:${CRATEDB_PASSWORD}@${CRATEDB_HOST}:${CRATEDB_HTTP_PORT}" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
*.json | ||
*.ndjson | ||
*.tar.gz | ||
*.jar | ||
.venv* | ||
__pycache__ | ||
!.env |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
# Use CrateDB with ingestr | ||
|
||
## About | ||
|
||
[ingestr] is a command-line application that allows copying data | ||
from any source into any destination database. | ||
|
||
This folder includes runnable examples that use ingestr with CrateDB. | ||
They are also used as integration tests to ensure software components | ||
fit together well. | ||
|
||
## Usage | ||
|
||
To start cycling without tearing down the backend stack each time, | ||
use the `KEEPALIVE` environment variable. | ||
```shell | ||
export KEEPALIVE=true | ||
sh test.sh | ||
``` | ||
|
||
|
||
[ingestr]: https://bruin-data.github.io/ingestr/ | ||
Comment on lines
+1
to
+22
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Backlog:
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,199 @@ | ||
#!/usr/bin/env bash | ||
|
||
# ===== | ||
# About | ||
# ===== | ||
|
||
# End-to-end test feeding data through a pipeline implemented with Apache Kafka, | ||
# ingestr, and CrateDB. The data source is a file in NDJSON format, the | ||
# data sink is a database table in CrateDB. | ||
|
||
|
||
# ============ | ||
# Main program | ||
# ============ | ||
|
||
set -eu | ||
|
||
amotl marked this conversation as resolved.
Show resolved
Hide resolved
|
||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" | ||
source "${SCRIPT_DIR}/.env" | ||
source "${SCRIPT_DIR}/util.sh" | ||
|
||
COMPOSE_FILE="${SCRIPT_DIR}/kafka-compose.yml" | ||
|
||
# ---------------------------- | ||
# Infrastructure and resources | ||
# ---------------------------- | ||
|
||
function start-services() { | ||
title "Starting services" | ||
docker compose --file ${COMPOSE_FILE} up --detach | ||
} | ||
|
||
function stop-services() { | ||
title "Stopping services" | ||
docker compose --file ${COMPOSE_FILE} down --remove-orphans | ||
} | ||
|
||
function setup() { | ||
delete-topic | ||
# drop-table # Please do not drop tables with ingestr | ||
# create-table # ingestr creates the table itself | ||
create-topic | ||
} | ||
|
||
function teardown() { | ||
delete-topic | ||
# drop-table # Please do not drop tables with ingestr | ||
} | ||
|
||
|
||
# ----------- | ||
# Data source | ||
# ----------- | ||
|
||
function create-topic() { | ||
title "Creating Kafka topic" | ||
docker compose --file ${COMPOSE_FILE} run --rm create-topic | ||
echo "Done." | ||
} | ||
|
||
function delete-topic() { | ||
title "Deleting Kafka topic" | ||
docker compose --file ${COMPOSE_FILE} run --rm --no-TTY delete-topic | ||
echo "Done." | ||
} | ||
|
||
|
||
# -------- | ||
# Pipeline | ||
# -------- | ||
|
||
function invoke-job() { | ||
|
||
# Invoke ingestr job. | ||
title "Invoking ingestr job" | ||
|
||
uvx --python="3.12" --prerelease="allow" --with-requirements="requirements.txt" ingestr ingest --yes \ | ||
--source-uri "kafka://?bootstrap_servers=localhost:${PORT_KAFKA_BROKER_EXTERNAL}&group_id=test_group" \ | ||
--source-table "demo" \ | ||
--dest-uri "cratedb://crate:crate@localhost:5432/?sslmode=disable" \ | ||
--dest-table "doc.kafka_demo" | ||
|
||
echo "Done." | ||
} | ||
|
||
function feed-data() { | ||
|
||
if [[ ! -f nyc-yellow-taxi-2017-subset.ndjson ]]; then | ||
|
||
title "Acquiring NDJSON data" | ||
|
||
# Acquire NYC Taxi 2017 dataset in JSON format (~90 MB). | ||
wget --no-clobber --continue https://gist.githubusercontent.com/kovrus/328ba1b041dfbd89e55967291ba6e074/raw/7818724cb64a5d283db7f815737c9e198a22bee4/nyc-yellow-taxi-2017.tar.gz | ||
|
||
# Extract archive. | ||
tar -xvf nyc-yellow-taxi-2017.tar.gz | ||
|
||
# Create a subset of the data (5000 records) for concluding the first steps. | ||
cat nyc-yellow-taxi-2017.json | head -n 5000 > nyc-yellow-taxi-2017-subset.ndjson | ||
|
||
fi | ||
|
||
# Publish data to the Kafka topic. | ||
title "Publishing NDJSON data to Kafka topic" | ||
cat nyc-yellow-taxi-2017-subset.ndjson | docker compose --file ${COMPOSE_FILE} run --rm --no-TTY publish-data | ||
echo "Done." | ||
|
||
# Wait a bit for the data to transfer and converge successfully. | ||
sleep 3 | ||
} | ||
|
||
|
||
# --------- | ||
# Data sink | ||
# --------- | ||
|
||
function create-table() { | ||
title "Creating CrateDB table" | ||
docker compose --file ${COMPOSE_FILE} run --rm create-table | ||
echo "Done." | ||
} | ||
|
||
function display-data() { | ||
|
||
title "Displaying data in CrateDB" | ||
|
||
docker compose --file ${COMPOSE_FILE} run --rm httpie \ | ||
http "${CRATEDB_HTTP_URL}/_sql?pretty" stmt='REFRESH TABLE "kafka_demo";' --ignore-stdin > /dev/null | ||
|
||
docker compose --file ${COMPOSE_FILE} run --rm httpie \ | ||
http "${CRATEDB_HTTP_URL}/_sql?pretty" stmt='SELECT * FROM "kafka_demo" LIMIT 5;' --ignore-stdin | ||
|
||
docker compose --file ${COMPOSE_FILE} run --rm httpie \ | ||
http "${CRATEDB_HTTP_URL}/_sql?pretty" stmt='SELECT COUNT(*) FROM "kafka_demo";' --ignore-stdin | ||
|
||
} | ||
|
||
function verify-data() { | ||
title "Verifying data in CrateDB" | ||
size_reference=5000 | ||
size_actual=$( | ||
docker compose --file ${COMPOSE_FILE} run --rm httpie \ | ||
http "${CRATEDB_HTTP_URL}/_sql?pretty" stmt='SELECT COUNT(*) FROM "kafka_demo";' --ignore-stdin \ | ||
| jq .rows[0][0] | ||
) | ||
if [[ "${size_actual}" = "${size_reference}" ]]; then | ||
echo -e "${BGREEN}SUCCESS: Database table contains expected number of ${size_reference} records.${NC}" | ||
else | ||
echo -e "${BRED}ERROR: Expected database table to contain ${size_reference} records, but it contains ${size_actual} records.${NC}" | ||
exit 2 | ||
fi | ||
} | ||
Comment on lines
+138
to
+152
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Backlog: Data validation is currently a bit thin, just counting the number of records and failing if it's not 5_000. Please expand the procedure, to also consider if data is in the right format within the target table. Also, why in hell is this written in Bash? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The joy of shell scripting? |
||
|
||
function drop-table() { | ||
title "Dropping CrateDB table" | ||
docker compose --file ${COMPOSE_FILE} run --rm drop-table | ||
echo "Done." | ||
} | ||
|
||
|
||
# ------ | ||
# Recipe | ||
# ------ | ||
|
||
function main() { | ||
|
||
# Start services and setup resources. | ||
start-services | ||
setup | ||
|
||
# Acquire data, feed data to Kafka topic, invoke import job, | ||
# and verify data has been stored into CrateDB. | ||
feed-data | ||
invoke-job | ||
display-data | ||
verify-data | ||
|
||
# Clean up resources. | ||
# teardown # Do not tear down, for inspection purposes. | ||
|
||
# Tear down only when `--keepalive` option has not been given. | ||
test -z "${KEEPALIVE-}" && stop-services | ||
} | ||
|
||
function start_subcommand() { | ||
if test -n "${SUBCOMMAND-}"; then | ||
${SUBCOMMAND-} | ||
exit | ||
fi | ||
} | ||
|
||
function start() { | ||
#read_options | ||
init_colors | ||
start_subcommand | ||
main | ||
} | ||
amotl marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
start |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice to avoid bit rot - thanks