From 9a3e842fb0ca323625430298a2390ebb6ad9521d Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Mon, 7 Jul 2025 02:57:53 +0200 Subject: [PATCH] Ingestr: Add example using a Kafka data source --- .github/dependabot.yml | 10 ++ .github/workflows/application-ingestr.yml | 46 +++++ application/ingestr/.dlt/config.toml | 3 + application/ingestr/.env | 26 +++ application/ingestr/.gitignore | 7 + application/ingestr/README.md | 22 +++ application/ingestr/kafka-cmd.sh | 199 ++++++++++++++++++++++ application/ingestr/kafka-compose.yml | 156 +++++++++++++++++ application/ingestr/requirements.txt | 1 + application/ingestr/test.sh | 22 +++ application/ingestr/util.sh | 75 ++++++++ 11 files changed, 567 insertions(+) create mode 100644 .github/workflows/application-ingestr.yml create mode 100644 application/ingestr/.dlt/config.toml create mode 100644 application/ingestr/.env create mode 100644 application/ingestr/.gitignore create mode 100644 application/ingestr/README.md create mode 100755 application/ingestr/kafka-cmd.sh create mode 100644 application/ingestr/kafka-compose.yml create mode 100644 application/ingestr/requirements.txt create mode 100644 application/ingestr/test.sh create mode 100644 application/ingestr/util.sh diff --git a/.github/dependabot.yml b/.github/dependabot.yml index 69d6b902..c9e74b9b 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -120,6 +120,16 @@ updates: schedule: interval: "daily" + - directory: "/application/ingestr" + package-ecosystem: "pip" + schedule: + interval: "daily" + + - directory: "/application/ingestr" + package-ecosystem: "docker-compose" + schedule: + interval: "daily" + - directory: "/application/metabase" package-ecosystem: "pip" schedule: diff --git a/.github/workflows/application-ingestr.yml b/.github/workflows/application-ingestr.yml new file mode 100644 index 00000000..a4ffe7fc --- /dev/null +++ b/.github/workflows/application-ingestr.yml @@ -0,0 +1,46 @@ +name: "Ingestr" + +on: + pull_request: + paths: + - '.github/workflows/application-ingestr.yml' + - 'application/ingestr/**' + push: + branches: [ main ] + paths: + - '.github/workflows/application-ingestr.yml' + - 'application/ingestr/**' + + # Allow job to be triggered manually. + workflow_dispatch: + + # Run job each night after CrateDB nightly has been published. + schedule: + - cron: '0 3 * * *' + +# Cancel in-progress jobs when pushing to the same branch. +concurrency: + cancel-in-progress: true + group: ${{ github.workflow }}-${{ github.ref }} + +jobs: + + test: + runs-on: ${{ matrix.os }} + + strategy: + fail-fast: true + matrix: + os: [ "ubuntu-latest" ] + + name: OS ${{ matrix.os }} + steps: + + - name: Acquire sources + uses: actions/checkout@v4 + + - name: Validate application/ingestr + run: | + # TODO: Generalize invocation into `ngr` test runner. + cd application/ingestr + bash test.sh diff --git a/application/ingestr/.dlt/config.toml b/application/ingestr/.dlt/config.toml new file mode 100644 index 00000000..f2b31185 --- /dev/null +++ b/application/ingestr/.dlt/config.toml @@ -0,0 +1,3 @@ +[runtime] + +dlthub_telemetry=false diff --git a/application/ingestr/.env b/application/ingestr/.env new file mode 100644 index 00000000..30b1ed03 --- /dev/null +++ b/application/ingestr/.env @@ -0,0 +1,26 @@ +# Software component versions. +CONFLUENT_VERSION=7.9.2 +CRATEDB_VERSION=5.10 +KCAT_VERSION=1.7.1 + +# Data source configuration (Kafka). +PORT_KAFKA_BROKER_INTERNAL=9092 +PORT_KAFKA_BROKER_EXTERNAL=9094 +PORT_KAFKA_ZOOKEEPER=2181 + +# Data sink configuration (CrateDB). +CRATEDB_HOST=${CRATEDB_HOST:-cratedb} +CRATEDB_HTTP_PORT=${CRATEDB_HTTP_PORT:-4200} +CRATEDB_POSTGRESQL_PORT=${CRATEDB_POSTGRESQL_PORT:-5432} +CRATEDB_USERNAME=${CRATEDB_USERNAME:-crate} +CRATEDB_PASSWORD=${CRATEDB_PASSWORD:-} +CRATEDB_HTTP_SCHEME=${CRATEDB_HTTP_SCHEME:-http} + +# Data sink configuration (CrateDB Cloud). +# CRATEDB_HTTP_SCHEME=https +# CRATEDB_HOST=example.aks1.westeurope.azure.cratedb.net +# CRATEDB_USERNAME='admin' +# CRATEDB_PASSWORD='' + +# Needs to be computed here. +CRATEDB_HTTP_URL="${CRATEDB_HTTP_SCHEME}://${CRATEDB_USERNAME}:${CRATEDB_PASSWORD}@${CRATEDB_HOST}:${CRATEDB_HTTP_PORT}" diff --git a/application/ingestr/.gitignore b/application/ingestr/.gitignore new file mode 100644 index 00000000..64ded041 --- /dev/null +++ b/application/ingestr/.gitignore @@ -0,0 +1,7 @@ +*.json +*.ndjson +*.tar.gz +*.jar +.venv* +__pycache__ +!.env \ No newline at end of file diff --git a/application/ingestr/README.md b/application/ingestr/README.md new file mode 100644 index 00000000..06da26c6 --- /dev/null +++ b/application/ingestr/README.md @@ -0,0 +1,22 @@ +# Use CrateDB with ingestr + +## About + +[ingestr] is a command-line application that allows copying data +from any source into any destination database. + +This folder includes runnable examples that use ingestr with CrateDB. +They are also used as integration tests to ensure software components +fit together well. + +## Usage + +To start cycling without tearing down the backend stack each time, +use the `KEEPALIVE` environment variable. +```shell +export KEEPALIVE=true +sh test.sh +``` + + +[ingestr]: https://bruin-data.github.io/ingestr/ diff --git a/application/ingestr/kafka-cmd.sh b/application/ingestr/kafka-cmd.sh new file mode 100755 index 00000000..18098f59 --- /dev/null +++ b/application/ingestr/kafka-cmd.sh @@ -0,0 +1,199 @@ +#!/usr/bin/env bash + +# ===== +# About +# ===== + +# End-to-end test feeding data through a pipeline implemented with Apache Kafka, +# ingestr, and CrateDB. The data source is a file in NDJSON format, the +# data sink is a database table in CrateDB. + + +# ============ +# Main program +# ============ + +set -eu + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +source "${SCRIPT_DIR}/.env" +source "${SCRIPT_DIR}/util.sh" + +COMPOSE_FILE="${SCRIPT_DIR}/kafka-compose.yml" + +# ---------------------------- +# Infrastructure and resources +# ---------------------------- + +function start-services() { + title "Starting services" + docker compose --file ${COMPOSE_FILE} up --detach +} + +function stop-services() { + title "Stopping services" + docker compose --file ${COMPOSE_FILE} down --remove-orphans +} + +function setup() { + delete-topic + # drop-table # Please do not drop tables with ingestr + # create-table # ingestr creates the table itself + create-topic +} + +function teardown() { + delete-topic + # drop-table # Please do not drop tables with ingestr +} + + +# ----------- +# Data source +# ----------- + +function create-topic() { + title "Creating Kafka topic" + docker compose --file ${COMPOSE_FILE} run --rm create-topic + echo "Done." +} + +function delete-topic() { + title "Deleting Kafka topic" + docker compose --file ${COMPOSE_FILE} run --rm --no-TTY delete-topic + echo "Done." +} + + +# -------- +# Pipeline +# -------- + +function invoke-job() { + + # Invoke ingestr job. + title "Invoking ingestr job" + + uvx --python="3.12" --prerelease="allow" --with-requirements="requirements.txt" ingestr ingest --yes \ + --source-uri "kafka://?bootstrap_servers=localhost:${PORT_KAFKA_BROKER_EXTERNAL}&group_id=test_group" \ + --source-table "demo" \ + --dest-uri "cratedb://crate:crate@localhost:5432/?sslmode=disable" \ + --dest-table "doc.kafka_demo" + + echo "Done." +} + +function feed-data() { + + if [[ ! -f nyc-yellow-taxi-2017-subset.ndjson ]]; then + + title "Acquiring NDJSON data" + + # Acquire NYC Taxi 2017 dataset in JSON format (~90 MB). + wget --no-clobber --continue https://gist.githubusercontent.com/kovrus/328ba1b041dfbd89e55967291ba6e074/raw/7818724cb64a5d283db7f815737c9e198a22bee4/nyc-yellow-taxi-2017.tar.gz + + # Extract archive. + tar -xvf nyc-yellow-taxi-2017.tar.gz + + # Create a subset of the data (5000 records) for concluding the first steps. + cat nyc-yellow-taxi-2017.json | head -n 5000 > nyc-yellow-taxi-2017-subset.ndjson + + fi + + # Publish data to the Kafka topic. + title "Publishing NDJSON data to Kafka topic" + cat nyc-yellow-taxi-2017-subset.ndjson | docker compose --file ${COMPOSE_FILE} run --rm --no-TTY publish-data + echo "Done." + + # Wait a bit for the data to transfer and converge successfully. + sleep 3 +} + + +# --------- +# Data sink +# --------- + +function create-table() { + title "Creating CrateDB table" + docker compose --file ${COMPOSE_FILE} run --rm create-table + echo "Done." +} + +function display-data() { + + title "Displaying data in CrateDB" + + docker compose --file ${COMPOSE_FILE} run --rm httpie \ + http "${CRATEDB_HTTP_URL}/_sql?pretty" stmt='REFRESH TABLE "kafka_demo";' --ignore-stdin > /dev/null + + docker compose --file ${COMPOSE_FILE} run --rm httpie \ + http "${CRATEDB_HTTP_URL}/_sql?pretty" stmt='SELECT * FROM "kafka_demo" LIMIT 5;' --ignore-stdin + + docker compose --file ${COMPOSE_FILE} run --rm httpie \ + http "${CRATEDB_HTTP_URL}/_sql?pretty" stmt='SELECT COUNT(*) FROM "kafka_demo";' --ignore-stdin + +} + +function verify-data() { + title "Verifying data in CrateDB" + size_reference=5000 + size_actual=$( + docker compose --file ${COMPOSE_FILE} run --rm httpie \ + http "${CRATEDB_HTTP_URL}/_sql?pretty" stmt='SELECT COUNT(*) FROM "kafka_demo";' --ignore-stdin \ + | jq .rows[0][0] + ) + if [[ "${size_actual}" = "${size_reference}" ]]; then + echo -e "${BGREEN}SUCCESS: Database table contains expected number of ${size_reference} records.${NC}" + else + echo -e "${BRED}ERROR: Expected database table to contain ${size_reference} records, but it contains ${size_actual} records.${NC}" + exit 2 + fi +} + +function drop-table() { + title "Dropping CrateDB table" + docker compose --file ${COMPOSE_FILE} run --rm drop-table + echo "Done." +} + + +# ------ +# Recipe +# ------ + +function main() { + + # Start services and setup resources. + start-services + setup + + # Acquire data, feed data to Kafka topic, invoke import job, + # and verify data has been stored into CrateDB. + feed-data + invoke-job + display-data + verify-data + + # Clean up resources. + # teardown # Do not tear down, for inspection purposes. + + # Tear down only when `--keepalive` option has not been given. + test -z "${KEEPALIVE-}" && stop-services +} + +function start_subcommand() { + if test -n "${SUBCOMMAND-}"; then + ${SUBCOMMAND-} + exit + fi +} + +function start() { + #read_options + init_colors + start_subcommand + main +} + +start diff --git a/application/ingestr/kafka-compose.yml b/application/ingestr/kafka-compose.yml new file mode 100644 index 00000000..f721d297 --- /dev/null +++ b/application/ingestr/kafka-compose.yml @@ -0,0 +1,156 @@ +networks: + ingestr-demo: + name: ingestr-demo + driver: bridge + +services: + + # --------------- + # Confluent Kafka + # --------------- + # https://docs.confluent.io/platform/current/installation/docker/config-reference.html + # https://gist.github.com/everpeace/7a317860cab6c7fb39d5b0c13ec2543e + # https://github.com/framiere/a-kafka-story/blob/master/step14/docker-compose.yml + kafka-zookeeper: + image: confluentinc/cp-zookeeper:${CONFLUENT_VERSION} + environment: + ZOOKEEPER_CLIENT_PORT: ${PORT_KAFKA_ZOOKEEPER} + KAFKA_OPTS: -Dzookeeper.4lw.commands.whitelist=ruok + networks: + - ingestr-demo + + # Define health check for Zookeeper. + healthcheck: + # https://github.com/confluentinc/cp-docker-images/issues/827 + test: ["CMD", "bash", "-c", "echo ruok | nc localhost ${PORT_KAFKA_ZOOKEEPER} | grep imok"] + start_period: 3s + interval: 2s + timeout: 30s + retries: 60 + + kafka-broker: + image: confluentinc/cp-kafka:${CONFLUENT_VERSION} + ports: + - "${PORT_KAFKA_BROKER_INTERNAL}:${PORT_KAFKA_BROKER_INTERNAL}" + - "${PORT_KAFKA_BROKER_EXTERNAL}:${PORT_KAFKA_BROKER_EXTERNAL}" + environment: + KAFKA_ZOOKEEPER_CONNECT: kafka-zookeeper:${PORT_KAFKA_ZOOKEEPER} + KAFKA_LISTENERS: INTERNAL://0.0.0.0:${PORT_KAFKA_BROKER_INTERNAL},EXTERNAL://0.0.0.0:${PORT_KAFKA_BROKER_EXTERNAL} + KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-broker:${PORT_KAFKA_BROKER_INTERNAL},EXTERNAL://localhost:${PORT_KAFKA_BROKER_EXTERNAL} + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + depends_on: + - kafka-zookeeper + networks: + - ingestr-demo + + # Define health check for Kafka broker. + healthcheck: + #test: ps augwwx | egrep "kafka.Kafka" + test: ["CMD", "nc", "-vz", "localhost", "${PORT_KAFKA_BROKER_INTERNAL}"] + start_period: 3s + interval: 0.5s + timeout: 30s + retries: 60 + + # ------- + # CrateDB + # ------- + cratedb: + image: crate:${CRATEDB_VERSION} + ports: + - "${CRATEDB_HTTP_PORT}:${CRATEDB_HTTP_PORT}" + - "${CRATEDB_POSTGRESQL_PORT}:${CRATEDB_POSTGRESQL_PORT}" + environment: + CRATE_HEAP_SIZE: 4g + + command: ["crate", + "-Cdiscovery.type=single-node", + "-Ccluster.routing.allocation.disk.threshold_enabled=false", + ] + networks: + - ingestr-demo + + # Define health check for CrateDB. + healthcheck: + test: [ "CMD", "curl", "--fail", "http://localhost:${CRATEDB_HTTP_PORT}" ] + start_period: 3s + interval: 0.5s + timeout: 30s + retries: 60 + + + # ------- + # Bundler + # ------- + # Wait for all defined services to be fully available by probing their health + # status, even when using `docker compose up --detach`. + # https://marcopeg.com/2019/docker-compose-healthcheck/ + start-dependencies: + image: dadarek/wait-for-dependencies + depends_on: + kafka-broker: + condition: service_healthy + cratedb: + condition: service_healthy + + + # ----- + # Tasks + # ----- + + # Create database table in CrateDB. + create-table: + image: westonsteimel/httpie + networks: [ingestr-demo] + command: http "${CRATEDB_HTTP_URL}/_sql?pretty" stmt='CREATE TABLE "kafka_demo" ("payload" OBJECT(DYNAMIC))' --ignore-stdin + deploy: + replicas: 0 + + # Create Kafka topic. + create-topic: + image: confluentinc/cp-kafka:${CONFLUENT_VERSION} + networks: [ingestr-demo] + command: kafka-topics --bootstrap-server kafka-broker:${PORT_KAFKA_BROKER_INTERNAL} --create --if-not-exists --replication-factor 1 --partitions 1 --topic demo + deploy: + replicas: 0 + + # Delete Kafka topic. + delete-topic: + image: confluentinc/cp-kafka:${CONFLUENT_VERSION} + networks: [ingestr-demo] + command: kafka-topics --bootstrap-server kafka-broker:${PORT_KAFKA_BROKER_INTERNAL} --delete --if-exists --topic demo + deploy: + replicas: 0 + + # Drop database table in CrateDB. + drop-table: + image: westonsteimel/httpie + networks: [ingestr-demo] + command: http "${CRATEDB_HTTP_URL}/_sql?pretty" stmt='DROP TABLE IF EXISTS "kafka_demo"' --ignore-stdin + deploy: + replicas: 0 + + # Invoke HTTPie via Docker. + httpie: + image: westonsteimel/httpie + networks: [ingestr-demo] + deploy: + replicas: 0 + + # Publish data to Kafka topic. + publish-data: + image: confluentinc/cp-kafka:${CONFLUENT_VERSION} + networks: [ingestr-demo] + command: kafka-console-producer --bootstrap-server kafka-broker:${PORT_KAFKA_BROKER_INTERNAL} --topic demo + deploy: + replicas: 0 + + # Subscribe to Kafka topic. + subscribe-topic: + image: edenhill/kcat:${KCAT_VERSION} + networks: [ingestr-demo] + command: kcat -b kafka-broker -C -t demo # -o end + deploy: + replicas: 0 diff --git a/application/ingestr/requirements.txt b/application/ingestr/requirements.txt new file mode 100644 index 00000000..b79e0792 --- /dev/null +++ b/application/ingestr/requirements.txt @@ -0,0 +1 @@ +ingestr @ git+https://github.com/crate-workbench/ingestr.git@next diff --git a/application/ingestr/test.sh b/application/ingestr/test.sh new file mode 100644 index 00000000..d7a5cf50 --- /dev/null +++ b/application/ingestr/test.sh @@ -0,0 +1,22 @@ +#!/usr/bin/env bash + +# ingestr integration test backplane. + +set -eu + +export INGESTR_DISABLE_TELEMETRY=true + +# Install `uv`. +function setup() { + if ! command -v uv >/dev/null 2>&1; then + pip install uv + fi +} + +# Invoke Kafka tests. +function test_kafka() { + bash kafka-cmd.sh +} + +setup +test_kafka diff --git a/application/ingestr/util.sh b/application/ingestr/util.sh new file mode 100644 index 00000000..c09798c8 --- /dev/null +++ b/application/ingestr/util.sh @@ -0,0 +1,75 @@ +#!/usr/bin/env bash + +# ================= +# Utility functions +# ================= + +function read_options() { + + local options + + # https://dustymabe.com/2013/05/17/easy-getopt-for-a-bash-script/ + options=$(getopt --options k --longoptions keepalive -- "$@") + + # Call getopt to validate the provided input. + [ $? -eq 0 ] || { + echo "Incorrect options provided" + exit 1 + } + eval set -- "$options" + while true; do + case "$1" in + -k) + KEEPALIVE=true + shift + continue + ;; + --keepalive) + KEEPALIVE=true + shift + continue + ;; + --) + shift + break + ;; + esac + shift + done + SUBCOMMAND=$1 + +} + +function title() { + text=$1 + len=${#text} + guard=$(printf "%${len}s" | sed 's/ /=/g') + echo + echo ${guard} + echo -e "${BYELLOW}${text}${NC}" + echo ${guard} +} + +function init_colors() { + # Reset + NC='\033[0m' + + # Regular + RED='\033[0;31m' + GREEN='\033[0;32m' + YELLOW='\033[0;33m' + BLUE='\033[0;34m' + PURPLE='\033[0;35m' + CYAN='\033[0;36m' + WHITE='\033[0;37m' + + # Bold + BBLACK='\033[1;30m' + BRED='\033[1;31m' + BGREEN='\033[1;32m' + BYELLOW='\033[1;33m' + BBLUE='\033[1;34m' + BPURPLE='\033[1;35m' + BCYAN='\033[1;36m' + BWHITE='\033[1;37m' +}