Skip to content

Ingestr: Add runnable example loading from Kafka #1013

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,16 @@ updates:
schedule:
interval: "daily"

- directory: "/application/ingestr"
package-ecosystem: "pip"
schedule:
interval: "daily"

- directory: "/application/ingestr"
package-ecosystem: "docker-compose"
schedule:
interval: "daily"

- directory: "/application/metabase"
package-ecosystem: "pip"
schedule:
Expand Down
46 changes: 46 additions & 0 deletions .github/workflows/application-ingestr.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
name: "Ingestr"

on:
pull_request:
paths:
- '.github/workflows/application-ingestr.yml'
- 'application/ingestr/**'
push:
branches: [ main ]
paths:
- '.github/workflows/application-ingestr.yml'
- 'application/ingestr/**'

# Allow job to be triggered manually.
workflow_dispatch:

# Run job each night after CrateDB nightly has been published.
schedule:
- cron: '0 3 * * *'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice to avoid bit rot - thanks


# Cancel in-progress jobs when pushing to the same branch.
concurrency:
cancel-in-progress: true
group: ${{ github.workflow }}-${{ github.ref }}

jobs:

test:
runs-on: ${{ matrix.os }}

strategy:
fail-fast: true
matrix:
os: [ "ubuntu-latest" ]

name: OS ${{ matrix.os }}
steps:

- name: Acquire sources
uses: actions/checkout@v4

- name: Validate application/ingestr
run: |
# TODO: Generalize invocation into `ngr` test runner.
cd application/ingestr
bash test.sh
3 changes: 3 additions & 0 deletions application/ingestr/.dlt/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[runtime]

dlthub_telemetry=false
26 changes: 26 additions & 0 deletions application/ingestr/.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Software component versions.
CONFLUENT_VERSION=7.9.2
CRATEDB_VERSION=5.10
KCAT_VERSION=1.7.1

# Data source configuration (Kafka).
PORT_KAFKA_BROKER_INTERNAL=9092
PORT_KAFKA_BROKER_EXTERNAL=9094
PORT_KAFKA_ZOOKEEPER=2181

# Data sink configuration (CrateDB).
CRATEDB_HOST=${CRATEDB_HOST:-cratedb}
CRATEDB_HTTP_PORT=${CRATEDB_HTTP_PORT:-4200}
CRATEDB_POSTGRESQL_PORT=${CRATEDB_POSTGRESQL_PORT:-5432}
CRATEDB_USERNAME=${CRATEDB_USERNAME:-crate}
CRATEDB_PASSWORD=${CRATEDB_PASSWORD:-}
CRATEDB_HTTP_SCHEME=${CRATEDB_HTTP_SCHEME:-http}

# Data sink configuration (CrateDB Cloud).
# CRATEDB_HTTP_SCHEME=https
# CRATEDB_HOST=example.aks1.westeurope.azure.cratedb.net
# CRATEDB_USERNAME='admin'
# CRATEDB_PASSWORD='<PASSWORD>'

# Needs to be computed here.
CRATEDB_HTTP_URL="${CRATEDB_HTTP_SCHEME}://${CRATEDB_USERNAME}:${CRATEDB_PASSWORD}@${CRATEDB_HOST}:${CRATEDB_HTTP_PORT}"
7 changes: 7 additions & 0 deletions application/ingestr/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
*.json
*.ndjson
*.tar.gz
*.jar
.venv*
__pycache__
!.env
22 changes: 22 additions & 0 deletions application/ingestr/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Use CrateDB with ingestr

## About

[ingestr] is a command-line application that allows copying data
from any source into any destination database.

This folder includes runnable examples that use ingestr with CrateDB.
They are also used as integration tests to ensure software components
fit together well.

## Usage

To start cycling without tearing down the backend stack each time,
use the `KEEPALIVE` environment variable.
```shell
export KEEPALIVE=true
sh test.sh
```


[ingestr]: https://bruin-data.github.io/ingestr/
Comment on lines +1 to +22
Copy link
Member Author

@amotl amotl Jul 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Backlog:

  • Add "What's inside" section, to educate readers about the bunch of files presented here, and how to use them.
  • Add "Synopsis" section including a typical ingestr ingest command to quickly present what it is actually all about.

199 changes: 199 additions & 0 deletions application/ingestr/kafka-cmd.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
#!/usr/bin/env bash

# =====
# About
# =====

# End-to-end test feeding data through a pipeline implemented with Apache Kafka,
# ingestr, and CrateDB. The data source is a file in NDJSON format, the
# data sink is a database table in CrateDB.


# ============
# Main program
# ============

set -eu

SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
source "${SCRIPT_DIR}/.env"
source "${SCRIPT_DIR}/util.sh"

COMPOSE_FILE="${SCRIPT_DIR}/kafka-compose.yml"

# ----------------------------
# Infrastructure and resources
# ----------------------------

function start-services() {
title "Starting services"
docker compose --file ${COMPOSE_FILE} up --detach
}

function stop-services() {
title "Stopping services"
docker compose --file ${COMPOSE_FILE} down --remove-orphans
}

function setup() {
delete-topic
# drop-table # Please do not drop tables with ingestr
# create-table # ingestr creates the table itself
create-topic
}

function teardown() {
delete-topic
# drop-table # Please do not drop tables with ingestr
}


# -----------
# Data source
# -----------

function create-topic() {
title "Creating Kafka topic"
docker compose --file ${COMPOSE_FILE} run --rm create-topic
echo "Done."
}

function delete-topic() {
title "Deleting Kafka topic"
docker compose --file ${COMPOSE_FILE} run --rm --no-TTY delete-topic
echo "Done."
}


# --------
# Pipeline
# --------

function invoke-job() {

# Invoke ingestr job.
title "Invoking ingestr job"

uvx --python="3.12" --prerelease="allow" --with-requirements="requirements.txt" ingestr ingest --yes \
--source-uri "kafka://?bootstrap_servers=localhost:${PORT_KAFKA_BROKER_EXTERNAL}&group_id=test_group" \
--source-table "demo" \
--dest-uri "cratedb://crate:crate@localhost:5432/?sslmode=disable" \
--dest-table "doc.kafka_demo"

echo "Done."
}

function feed-data() {

if [[ ! -f nyc-yellow-taxi-2017-subset.ndjson ]]; then

title "Acquiring NDJSON data"

# Acquire NYC Taxi 2017 dataset in JSON format (~90 MB).
wget --no-clobber --continue https://gist.githubusercontent.com/kovrus/328ba1b041dfbd89e55967291ba6e074/raw/7818724cb64a5d283db7f815737c9e198a22bee4/nyc-yellow-taxi-2017.tar.gz

# Extract archive.
tar -xvf nyc-yellow-taxi-2017.tar.gz

# Create a subset of the data (5000 records) for concluding the first steps.
cat nyc-yellow-taxi-2017.json | head -n 5000 > nyc-yellow-taxi-2017-subset.ndjson

fi

# Publish data to the Kafka topic.
title "Publishing NDJSON data to Kafka topic"
cat nyc-yellow-taxi-2017-subset.ndjson | docker compose --file ${COMPOSE_FILE} run --rm --no-TTY publish-data
echo "Done."

# Wait a bit for the data to transfer and converge successfully.
sleep 3
}


# ---------
# Data sink
# ---------

function create-table() {
title "Creating CrateDB table"
docker compose --file ${COMPOSE_FILE} run --rm create-table
echo "Done."
}

function display-data() {

title "Displaying data in CrateDB"

docker compose --file ${COMPOSE_FILE} run --rm httpie \
http "${CRATEDB_HTTP_URL}/_sql?pretty" stmt='REFRESH TABLE "kafka_demo";' --ignore-stdin > /dev/null

docker compose --file ${COMPOSE_FILE} run --rm httpie \
http "${CRATEDB_HTTP_URL}/_sql?pretty" stmt='SELECT * FROM "kafka_demo" LIMIT 5;' --ignore-stdin

docker compose --file ${COMPOSE_FILE} run --rm httpie \
http "${CRATEDB_HTTP_URL}/_sql?pretty" stmt='SELECT COUNT(*) FROM "kafka_demo";' --ignore-stdin

}

function verify-data() {
title "Verifying data in CrateDB"
size_reference=5000
size_actual=$(
docker compose --file ${COMPOSE_FILE} run --rm httpie \
http "${CRATEDB_HTTP_URL}/_sql?pretty" stmt='SELECT COUNT(*) FROM "kafka_demo";' --ignore-stdin \
| jq .rows[0][0]
)
if [[ "${size_actual}" = "${size_reference}" ]]; then
echo -e "${BGREEN}SUCCESS: Database table contains expected number of ${size_reference} records.${NC}"
else
echo -e "${BRED}ERROR: Expected database table to contain ${size_reference} records, but it contains ${size_actual} records.${NC}"
exit 2
fi
}
Comment on lines +138 to +152
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Backlog: Data validation is currently a bit thin, just counting the number of records and failing if it's not 5_000. Please expand the procedure, to also consider if data is in the right format within the target table. Also, why in hell is this written in Bash?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The joy of shell scripting?


function drop-table() {
title "Dropping CrateDB table"
docker compose --file ${COMPOSE_FILE} run --rm drop-table
echo "Done."
}


# ------
# Recipe
# ------

function main() {

# Start services and setup resources.
start-services
setup

# Acquire data, feed data to Kafka topic, invoke import job,
# and verify data has been stored into CrateDB.
feed-data
invoke-job
display-data
verify-data

# Clean up resources.
# teardown # Do not tear down, for inspection purposes.

# Tear down only when `--keepalive` option has not been given.
test -z "${KEEPALIVE-}" && stop-services
}

function start_subcommand() {
if test -n "${SUBCOMMAND-}"; then
${SUBCOMMAND-}
exit
fi
}

function start() {
#read_options
init_colors
start_subcommand
main
}

start
Loading