|
| 1 | +#/usr/bin/env bash |
| 2 | + |
| 3 | +# ===== |
| 4 | +# About |
| 5 | +# ===== |
| 6 | + |
| 7 | +# End-to-end test feeding data through a pipeline implemented with Apache Kafka, |
| 8 | +# ingestr, and CrateDB. The data source is a file in NDJSON format, the |
| 9 | +# data sink is a database table in CrateDB. |
| 10 | + |
| 11 | + |
| 12 | +# ============ |
| 13 | +# Main program |
| 14 | +# ============ |
| 15 | + |
| 16 | +set -euo pipefail |
| 17 | + |
| 18 | +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" |
| 19 | +source "${SCRIPT_DIR}/.env" |
| 20 | +source "${SCRIPT_DIR}/util.sh" |
| 21 | + |
| 22 | +COMPOSE_FILE=kafka-compose.yml |
| 23 | + |
| 24 | +# ---------------------------- |
| 25 | +# Infrastructure and resources |
| 26 | +# ---------------------------- |
| 27 | + |
| 28 | +function start-services() { |
| 29 | + title "Starting services" |
| 30 | + docker compose --file ${COMPOSE_FILE} up --detach |
| 31 | +} |
| 32 | + |
| 33 | +function stop-services() { |
| 34 | + title "Stopping services" |
| 35 | + docker compose --file ${COMPOSE_FILE} down --remove-orphans |
| 36 | +} |
| 37 | + |
| 38 | +function setup() { |
| 39 | + delete-topic |
| 40 | + # drop-table # Please do not drop tables with ingestr |
| 41 | + # create-table # ingestr creates the table itself |
| 42 | + create-topic |
| 43 | +} |
| 44 | + |
| 45 | +function teardown() { |
| 46 | + delete-topic |
| 47 | + # drop-table # Please do not drop tables with ingestr |
| 48 | +} |
| 49 | + |
| 50 | + |
| 51 | +# ----------- |
| 52 | +# Data source |
| 53 | +# ----------- |
| 54 | + |
| 55 | +function create-topic() { |
| 56 | + title "Creating Kafka topic" |
| 57 | + docker compose --file ${COMPOSE_FILE} run --rm create-topic |
| 58 | + echo "Done." |
| 59 | +} |
| 60 | + |
| 61 | +function delete-topic() { |
| 62 | + title "Deleting Kafka topic" |
| 63 | + docker compose --file ${COMPOSE_FILE} run --rm --no-TTY delete-topic |
| 64 | + echo "Done." |
| 65 | +} |
| 66 | + |
| 67 | + |
| 68 | +# -------- |
| 69 | +# Pipeline |
| 70 | +# -------- |
| 71 | + |
| 72 | +function invoke-job() { |
| 73 | + |
| 74 | + # Invoke ingestr job. |
| 75 | + title "Invoking ingestr job" |
| 76 | + |
| 77 | + uvx --python="3.12" --prerelease="allow" --with-requirements="requirements.txt" ingestr ingest --yes \ |
| 78 | + --source-uri "kafka://?bootstrap_servers=localhost:${PORT_KAFKA_BROKER_EXTERNAL}&group_id=test_group" \ |
| 79 | + --source-table "demo" \ |
| 80 | + --dest-uri "cratedb://crate:crate@localhost:5432/?sslmode=disable" \ |
| 81 | + --dest-table "doc.kafka_demo" |
| 82 | + |
| 83 | + echo "Done." |
| 84 | +} |
| 85 | + |
| 86 | +function feed-data() { |
| 87 | + |
| 88 | + if [[ ! -f nyc-yellow-taxi-2017-subset.ndjson ]]; then |
| 89 | + |
| 90 | + title "Acquiring NDJSON data" |
| 91 | + |
| 92 | + # Acquire NYC Taxi 2017 dataset in JSON format (~90 MB). |
| 93 | + wget --no-clobber --continue https://gist.githubusercontent.com/kovrus/328ba1b041dfbd89e55967291ba6e074/raw/7818724cb64a5d283db7f815737c9e198a22bee4/nyc-yellow-taxi-2017.tar.gz |
| 94 | + |
| 95 | + # Extract archive. |
| 96 | + tar -xvf nyc-yellow-taxi-2017.tar.gz |
| 97 | + |
| 98 | + # Create a subset of the data (5000 records) for concluding the first steps. |
| 99 | + cat nyc-yellow-taxi-2017.json | head -n 5000 > nyc-yellow-taxi-2017-subset.ndjson |
| 100 | + |
| 101 | + fi |
| 102 | + |
| 103 | + # Publish data to the Kafka topic. |
| 104 | + title "Publishing NDJSON data to Kafka topic" |
| 105 | + cat nyc-yellow-taxi-2017-subset.ndjson | docker compose --file ${COMPOSE_FILE} run --rm --no-TTY publish-data |
| 106 | + echo "Done." |
| 107 | + |
| 108 | + # Wait a bit for the data to transfer and converge successfully. |
| 109 | + sleep 3 |
| 110 | +} |
| 111 | + |
| 112 | + |
| 113 | +# --------- |
| 114 | +# Data sink |
| 115 | +# --------- |
| 116 | + |
| 117 | +function create-table() { |
| 118 | + title "Creating CrateDB table" |
| 119 | + docker compose --file ${COMPOSE_FILE} run --rm create-table |
| 120 | + echo "Done." |
| 121 | +} |
| 122 | + |
| 123 | +function display-data() { |
| 124 | + |
| 125 | + title "Displaying data in CrateDB" |
| 126 | + |
| 127 | + docker compose --file ${COMPOSE_FILE} run --rm httpie \ |
| 128 | + http "${CRATEDB_HTTP_URL}/_sql?pretty" stmt='REFRESH TABLE "kafka_demo";' --ignore-stdin > /dev/null |
| 129 | + |
| 130 | + docker compose --file ${COMPOSE_FILE} run --rm httpie \ |
| 131 | + http "${CRATEDB_HTTP_URL}/_sql?pretty" stmt='SELECT * FROM "kafka_demo" LIMIT 5;' --ignore-stdin |
| 132 | + |
| 133 | + docker compose --file ${COMPOSE_FILE} run --rm httpie \ |
| 134 | + http "${CRATEDB_HTTP_URL}/_sql?pretty" stmt='SELECT COUNT(*) FROM "kafka_demo";' --ignore-stdin |
| 135 | + |
| 136 | +} |
| 137 | + |
| 138 | +function verify-data() { |
| 139 | + title "Verifying data in CrateDB" |
| 140 | + size_reference=5000 |
| 141 | + size_actual=$( |
| 142 | + docker compose --file ${COMPOSE_FILE} run --rm httpie \ |
| 143 | + http "${CRATEDB_HTTP_URL}/_sql?pretty" stmt='SELECT COUNT(*) FROM "kafka_demo";' --ignore-stdin \ |
| 144 | + | jq .rows[0][0] |
| 145 | + ) |
| 146 | + if [[ "${size_actual}" = "${size_reference}" ]]; then |
| 147 | + echo -e "${BGREEN}SUCCESS: Database table contains expected number of ${size_reference} records.${NC}" |
| 148 | + else |
| 149 | + echo -e "${BRED}ERROR: Expected database table to contain ${size_reference} records, but it contains ${size_actual} records.${NC}" |
| 150 | + exit 2 |
| 151 | + fi |
| 152 | +} |
| 153 | + |
| 154 | +function drop-table() { |
| 155 | + title "Dropping CrateDB table" |
| 156 | + docker compose --file ${COMPOSE_FILE} run --rm drop-table |
| 157 | + echo "Done." |
| 158 | +} |
| 159 | + |
| 160 | + |
| 161 | +# ------ |
| 162 | +# Recipe |
| 163 | +# ------ |
| 164 | + |
| 165 | +function main() { |
| 166 | + |
| 167 | + # Start services and setup resources. |
| 168 | + start-services |
| 169 | + setup |
| 170 | + |
| 171 | + # Acquire data, feed data to Kafka topic, invoke import job, |
| 172 | + # and verify data has been stored into CrateDB. |
| 173 | + feed-data |
| 174 | + invoke-job |
| 175 | + display-data |
| 176 | + verify-data |
| 177 | + |
| 178 | + # Clean up resources. |
| 179 | + # teardown # Do not tear down, for inspection purposes. |
| 180 | + |
| 181 | + # Tear down only when `--keepalive` option has not been given. |
| 182 | + test -z "${KEEPALIVE-}" && stop-services |
| 183 | +} |
| 184 | + |
| 185 | +function start_subcommand() { |
| 186 | + if test -n "${SUBCOMMAND-}"; then |
| 187 | + ${SUBCOMMAND-} |
| 188 | + exit |
| 189 | + fi |
| 190 | +} |
| 191 | + |
| 192 | +function start() { |
| 193 | + #read_options |
| 194 | + init_colors |
| 195 | + start_subcommand |
| 196 | + main |
| 197 | +} |
| 198 | + |
| 199 | +start |
0 commit comments