Kafka application, using Aiven, which:
- Creates a producer
- Generates fake testing data
- Publishes data to a Kafka topic
- Kafka consumer receives data and inserts into a postgres DB
Using requirements.txt and pip
pip install --user --requirement requirements.txt
Using Poetry (recommended method)
poetry add aiven_demo
Note: This demo is not yet on pypi
- Python 3.9
- Kafka 2.6 (using Aiven)
- Postgres DB 12 (using Aiven)
All of these env vars are optional. Please see step 1 of Running the application for other way of setting them
KAFKA_AIVEN_SERVICE_URI
-> service uri to your Kafka service hosted by AivenKAFKA_AIVEN_CA_PATH
-> ca path to your Kafka service hosted by AivenKAFKA_AIVEN_KEY_PATH
-> key path to your Kafka service hosted by AivenKAFKA_AIVEN_CERT_PATH
-> cert path to your Kafka service hosted by AivenPOSTGRES_AIVEN_DB_URI
-> service uri to your Postgres DB service hosted by Aiven
Note that you might have to set the PYTHONPATH
environment variable to the root of this folder
This app assumes you have an Aiven Kafka and Postgres service created.
- Configure the environmental variables in the first few lines of the
Makefile
program
SERVICE_URI := $(or $(KAFKA_AIVEN_SERVICE_URI), "manual-input-here")
CA_PATH := $(or $(KAFKA_AIVEN_CA_PATH), "manual-input-here")
KEY_PATH := $(or $(KAFKA_AIVEN_KEY_PATH), "manual-input-here")
CERT_PATH := $(or $(KAFKA_AIVEN_CERT_PATH), "manual-input-here")
DB_URI := $(or $(POSTGRES_AIVEN_DB_URI), "manual-input-here")
All variables beginning with KAFKA_AIVEN are environment variables that you can set before running. Otherwise, you can set the values by replacing the manual-input-here
text.
You can get the respective values in the Overview section of your aiven console for your postgres and kafka services
-
Start the Kafka consumer
make consumer start
-
Start the Kafka producer
make producer start
Optionally, the makefile comes with other functionality such as linting, checking types, and running tests
- Default is sample_customer_profile. Can also be modified with the
--topic-name
option within the CLI task. - TODO: add more kafka default options here
No tests created
Ideally, I'd like to use pytest for unit tests
- Testing main functions in the consumer, producer, and postgres setup python files
Integration tests:
- Sending data from producer to consumer
- Sending data from consumer to postgres DBs
I used a variety of online resources to help me build this:
- kafka/python repo from aiven-examples - consumer and producer were largely based off this
- hackers and slackers for learning more about poetry package manager
- kafka-python repository and its documentation
- aiven-kafka demo to get inspired for the kafka to postgres connection
Things that I'd like to improve in the feature
- Allow users to choose their postgres schema
- Add testing (as noted above)
- Extend postgres DB functionality (more flexible parsing of kafka messages)
- Make other kafka defaults (like group) explicit / configurable
- Potentially use a Class object for Kafka Consumer/Producer
- Add proper logging (and remove print statements)