- Playground - DDD Aggregates w/ CDC and CQRS
- Elevator Pitch
- What is in It?
- References
- The Story
- Solution
- Playground Environment
- DEMO
- From Playground to Arena (i.e. The Cloud)
- Development Notes (Step-by-Step)
- Development Troubleshooting Notes
- My Discoveries (What I Learned)
This repository "playground" delivers a solution design with demo-able code that is colored with a fun, fictional story providing a "magical journey from context to code using my grandkids love of snacks as inspiration to bring it all together".
Getting acquainted with this material will leave you a certified Solution Snackitect with an understanding of applying CDC, CQRS, DDD, Kafka Streams, and Flink SQL into an end-to-end solution.
This repository is packed with concepts, drawings, slides, and code related to a working "playground" building DDD Aggregates via a CDC-CQRS data stream.
The Presentation Slides outline a solution journey from story (requirements) to solution (architecture & design) to code. The goal of this demo is to give context to code and provide a pragmatic approach for designing sofware using DDD as a key driver.
This demo highlights a CDC-CQRS pipeline between a normalized relational database, MySQL, as the command database and a de-normalized NoSQL database, MongoDB, as the query database resulting in the creation of DDD Aggregates via Debezium & Kafka-Streams.
The Snacks Unlimited "Store" source code is centered around three microservices:
- snack-order-commands
- snack-order-processor
- snack-customer-orders
These services are implemented as Spring-Boot applications in Java.
The snack-order-commands
exposes API REST endpoints which persist item-details, shipping-info, and payment in their respective tables on MySQL database.
Debezium tails the MySQL bin logs to capture any events in both these tables and publishes messages to Kafka topics.
These topics are consumed by snack-order-processor
which is a Kafka-Streams application that joins data from these topics to create an Order-Aggregate (CustomerOrder)
object which is then published to a customer-order-aggregate
topic.
NOTE: The snack-order-processor
is a Kafka-Streams application. The source code also includes a Flink SQL Job
that can be used as an alternative to Kafka-Streams. The Flink SQL job defined in snack-order-flinksql
also joins data to create a Customer Order Aggregate
that is inserted into the customer-order-aggregate
topic.
This customer-order-aggregate
topic is consumed by MongoDB Sink Connector and the data is persisted in MongoDB which is served by snack-customer-orders
service.
The Snacks Unlimited "Store" source code also includes a Next.js frontend application called snickers-promotion-app
. This is a simple web application that submits order commands for a Snickers.
It is used for demo purposes.
- Source Material
- Telling the Story - Methods
- Solutioning
- Development
Snacks Unlimited Order Management System.
Below is a break down of a fictitious but relatable problem and the solution journey to building a scalable, flexible system using CQRS with CDC and Kadka.
-
Get started Install Warpstream Agent
The "platform" consists of:
- Confluent Platform (Docker) - Kafka, Kafka Connect
- Apache Flink - SQL Client
- Kafka Connectors for MySQL source and MongoDB sink
- MySQL and "Adminer" (manage MySQL via browser)
- MongoDB and Mongo Express (manage Mongo via browser)
The steps below can be executed from ./workspace directory.
from <ROOT> directory of this repository
$ cd ./workspace
Start and Stop the Platform
$ ./platform kafka|redpanda|warpstream start
$ ./platform kafka|redpanda|warpstream stop
Create the MySQL source and MongoDB sink Kafka Connectors
*NOTE: platform containers must be started and ready. *NOTE: platform containers must be started and ready.
$ ./platform init-connectors
Creating MySQL SourceConnector...
201
Creating MongoDB Sink Connector...
201
Build the Apps
$ ./apps build
$ ./docker-clean
Start the Apps with Kafka Streams Processing
This will start the following Spring Boot applications:
- snack-order-commands (Write / MySQL)
- snack-customer-orders (Read / MongoDB)
- snack-order-processor (Anti-corruption / Kafka Streams)
$ ./apps start --kstreams
Start the Apps with FlinkSQL Processing
This will start the following Spring Boot applications:
- snack-order-commands (Write / MySQL)
- snack-customer-orders (Read / MongoDB)
- FlinkSQL: streaming job with INSERT command
$ ./apps start --flinksql
Stop the Apps
$ ./apps stop
Run FlinkSQL Command-Line
$ ./apps cli flinksql
The following commands will stop the docker containers and perform a docker cleanup to remove/prune any dangling docker images, containers, and volumes.
$ ./apps stop
$ ./platform kafka|redpanda stop
$ ./docker-clean
-- Demo with Kafka Streams
$ cd .workspace
$ ./platform redpanda|kafka start
$ ./platform init-connectors
Creating MySQL SourceConnector...
201
Creating MongoDB Sink Connector...
201
$ ./apps start --kstreams
-- Demo with Flink SQL
$ cd .workspace
$ ./platform redpanda|kafka start
$ ./apps start --flinksql
$ ./apps stop
$ ./platform kafka|redpanda stop
$ ./docker-clean
See Snacks Unlimited - Postman Collection.
--- DEMO START ---
-
1 - Create Kafka Connectors
- POST Create MySQL CDC Connector
- POST Create MongoDB Sink Connector
-
2a - Brady's Order
- POST Snack Items
- POST Shipping Location
- POST Payment
-
2b -Oakley's Order
- POST Snack Items
- POST Shipping Location
- POST Payment
-
2c - Rylan's Order
- POST Snack Items
- POST Shipping Location
- POST Payment
-
3 - Fulfill Orders (Sam the Wizard)
- GET PAID Orders
- PUT Order Fulfillment
-
4 - Ship Orders (Minions Delivery Team)
- GET FULFILLED Orders
- PUT Order Shipment
--- DEMO COMPLETE ---
-
1- Open Browser to http://{host}:3000
-
2 - Fulfill Orders (Sam the Wizard)
- GET PAID Orders
- PUT Order Fulfillment
-
3 - Ship Orders (Minions Delivery Team)
- GET FULFILLED Orders
- PUT Order Shipment
- Flink SQL Shell
-- Open Flink SQL Shell
$ cd .workspace
$ ./apps flinksql-cli
Flink SQL>
-
Kafka CLI
- List Kafka Topics
$ kafka-topics \ --bootstrap-server $BOOTSTRAP_SERVER \ --command-config $CC_CONFIG \ --list
-
Consume Kafka Topic Records
$ kafka-console-consumer \ --topic "$1" \ --bootstrap-server $BOOTSTRAP_SERVER \ --property "schema.registry.url=$SCHEMA_REGISTRY_URL" \ --property "basic.auth.credentials.source=USER_INFO" \ --property "basic.auth.user.info=$SCHEMA_AUTH" \ --property "print.key=true" \ --property "key.separator= ==> " \ --consumer.config $CC_CONFIG \ --consumer-property "group.id=$CONSUMER_GRP" \ --from-beginning -- OPTIONAL
The EC2 instance needs to have enough CPU, Memory and Storage to run the 12-14 Snacks Unlimited containers.
Based on local Docker Desktop stats the following EC2 instance will be a good fit for running the demo containers on AWS.
t3.large
- vCPUs: 2
- Memory: 8GB
- Storage: 30GB
Launch EC2 Instance
Go to EC2 Dashboard → Instances → Launch Instance
Name: demo-snacks-unlimited
AMI: Amazon Linux 2 AMI (HVM), SSD Volume Type
Instance type: t3.large
Key pair: Create a New Key pair `demo-snacks-unlimited`
Network settings:
- Default VPN
- Security Group: demo-snacks-unlimited-sg
- Auto Assign Public IP
- Allow SSH (port 22) from your IP
- Allow Next.js App (port 3000) from your IP
- Allow Snacks Unlimited APIs (port 8800-8802) from your IP
Storage: 30 GB
Launch
Connect to EC2
ssh -i /path/to/your-key.pem ec2-user@<public-ip>
Initial Setup Scripts
Install Git and Docker
$ sudo dnf update -y
$ sudo yum update -y
$ sudo yum install git -y
$ sudo dnf install -y docker
$ sudo systemctl enable docker
$ sudo systemctl start docker
$ sudo usermod -aG docker ec2-user
Test Docker
$ docker version
$ docker info
Install Docker Compose
$ mkdir -p ~/.docker/cli-plugins
$ curl -SL https://github.com/docker/compose/releases/download/v2.24.1/docker-compose-linux-x86_64 -o ~/.docker/cli-plugins/docker-compose
$ docker compose version
Install Java
$ sudo dnf install -y java-17-amazon-corretto
$ java --version
OpenJDK Runtime Environment Corretto-17.0.14.7.1 (build 17.0.14+7-LTS)
OpenJDK 64-Bit Server VM Corretto-17.0.14.7.1 (build 17.0.14+7-LTS, mixed mode, sharing)
Configure NGINX to Secure Next.js app with HTTPS
SSH into demo-snacks-unlimited EC2 instance.
Install nginx
$ sudo yum install nginx -y
Install Certbot (Let's Encrypt SSL tool)
$ sudo yum install certbot python3-certbot-nginx -y
Configure NGINX as reverse proxy. Edit /etc/nginx/nginx.conf
and add location block:
server {
listen 80;
server_name {your-domain.com};
location / {
proxy_pass http://localhost:3000;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection 'upgrade';
proxy_set_header Host $host;
proxy_cache_bypass $http_upgrade;
}
}
Test
$ sudo nginx -t
$ sudo systemctl enable nginx
$ sudo systemctl start nginx
$ sudo systemctl reload nginx
Get and Install SSL Cert
This only works IF you register a domain from a registrar such as Route53, Namecheap, GoDaddy, Google Domains.
$ sudo certbot --nginx -d your-domain.com -d {your-domain.com}
Clone Snacks Unlimited Demo
$ mkdir ~/demos
$ cd ~/demos
$ git clone https://github.com/improving-minnesota/demo-ordermgmt-cqrs-cdc.git snacks-unlimited-demo
$ cd ~/demos/snacks-unlimited-demo/workspace
Build App Images
$ ./apps build
$ ./docker-clean
Startup the Platform and Apps
These are the combinations I found worked on this EC2 instance. Not sure why, but the Flink Job Manager had issues running the Flink SQL job with Redpanda as the broker. NOTE: All combinations worked fine on a Macbook using Docker Desktop. I suspect a network config but I have not looked into it.
Combo 1
$ ./platform redpanda start
$ ./apps start --kstreams
Combo 2
$ ./platform kafka start
$ ./apps start --kstreams
Combo 3
$ ./platform kafka start
$ ./apps start --flinksql
Combo 4 (failed)
$ ./platform redpanda start
$ ./apps start --flinksql
MySQL Source Connector and MongoDB Sink Connector
$ ./platform init-connectors
After Instance is Started
When stopping the EC2 Instance, on startup the following commands need to run to startup the apps and platforms. SSH into the instance:
$ cd ~/demos/snacks-unlimited-demo/workspace
Clean up to be safe
$ ./platform redpanda|kafka stop
$ ./apps stop
$ ./docker-clean
Start the Platform and Apps
(See startup combinations in previous section)
$ ./apps cli flinksql
// ALL customer order aggregates
Flink SQL> select orderId, orderStatus, shippingLocation.customerName from customer_order_aggregate;
// PAID customer order aggregates
Flink SQL> select orderId, orderStatus, shippingLocation.customerName from customer_order_aggregate where orderStatus = 'PAID';
This is a transactional service that triggers the overall CQRS workflow by initiating order commands:
- POST order items (api/item)
- POST shipping details (api/shipping)
- POST payment details (api/payment)
$ spring init -d=web,jpa,mysql --build=gradle --type=gradle-project-kotlin snack-order-commands
Create the base packages:
- api.rest
- data
- exception
- service
- see src/main/java/api/rest/OrderWriteController.java
This service consumes from the CDC Topics (triggered from order-write-service) and joins them into an Order Aggregrate for reading.
$ spring init -d=web --build=gradle --type=gradle-project-kotlin snack-order-processor
Add in the Kafka dependencies in build.gradle.kts:
implementation("org.springframework.kafka:spring-kafka")
implementation("org.apache.kafka:kafka-streams")
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("org.springframework.kafka:spring-kafka-test")
Create the base packages:
- config (Kafka Configuration)
- model.domain
- model.inbound
- stream (Kafka Stream topology and Serdes configuration)
- Add the following line to OrderAggregateStream:
See TopologyController class
- (OPTIONAL) Add this to application.yml to enable actuator endpoints.
management:
endpoints:å
web:
exposure:
include: "*"
-
From Browser: http://locahost:8801/api/topology
-
Paste Input into https://zz85.github.io/kafka-streams-viz/
-
View Topology
This is a query service that reads from a MongoDB datastore and represents the read segregated responsibilities of CQRS.
Query commands:
- GET api/orders/{orderId}
$ spring init -d=web,data-mongodb --build=gradle --type=gradle-project-kotlin snack-customer-orders
Create the base packages:
- api.rest
- api.model
- data.repository
- service
- see src/main/java/api/rest/OrderWriteController.java
With the Platform Running (See Above).
POST http://localhost:8083/connectors
--
{
"name": "order-command-db-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"topic.prefix": "order-command-server",
"database.hostname": "mysql_db_server",
"database.port": "3306",
"database.user": "order-command-user",
"database.password": "password",
"database.server.id": "142401",
"database.server.name": "order-command-server",
"database.whitelist": "order-command-db",
"table.whitelist": "order-command-db.payment, order-command-db.shipping_location,order-command-db.item_detail",
"schema.history.internal.kafka.bootstrap.servers": "broker:29092",
"schema.history.internal.kafka.topic": "dbhistory.order-command-db",
"include.schema.changes": "true",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
}
}
With the Platform Running (See Above).
POST http://localhost:8083/connectors
--
{
"name": "order-app-mongo-sink-connector",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"topics": "customer-order-aggregate",
"connection.uri": "mongodb://mongo-user:password@mongodb_server:27017",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"database": "customer_order_db",
"collection": "customerOrder",
"document.id.strategy.overwrite.existing": "true",
"document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInKeyStrategy",
"transforms": "hk,hv",
"transforms.hk.type": "org.apache.kafka.connect.transforms.HoistField$Key",
"transforms.hk.field": "_id",
"transforms.hv.type": "org.apache.kafka.connect.transforms.HoistField$Value",
"transforms.hv.field": "customerOrder"
}
}
Iorg.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'dataSourceScriptDatabaseInitializer'
This is due to the JPA Data Source not being configured correctly. This can be fixed in the following way:
- Add the data source configuration to application.yml.
spring:
datasource:
url: jdbc:mysql://localhost:13306/order-command-db
username: order-command-user
password: password
driver-class-name: com.mysql.cj.jdbc.Driver
jpa:
generate-ddl: true
show-sql: true
hibernate:
ddl-auto: update
show-sql: true
properties:
hibernate:
format_sql: true
dialect: org.hibernate.dialect.MySQLDialect
Add this to your build.gradle.kts:
testImplementation("com.h2database:h2")
This annotation starts up an in-memory database to run the JPA repository tests.
In application.yml you do not need this:
# dialect: org.hibernate.dialect.MySQLDialect
Debezium MySQLConnector Error: Access denied; you need the SUPER, RELOAD, or FLUSH_TABLES privilege for this operation
Need to grant privileges for the Debezium MySQLConnector
GRANT ALL PRIVILEGES ON *.* TO 'order-command-user';
I learned alot so this is not an exhuastive list.
Item | Discovery Notes |
---|---|
Using @DataJpaTest for JPA Repository Tests | This annotation bootstraps the test with an in-memory H2 SQL database. |
Leverage Spring profiles for ./gradlew bootRun, ./apps start (local-docker), local-embedded for testing | See _src/main/resources/application.yml |
Access privileges for CDC Connector | Check out ./docker/mysql/init/init-db.sql to grant privileges to the order-command-user. |
Flink SQL Client | - tables (connectors) are created in memory and are "gone" after the sessions is closed - The Kafka topics the tables connect to must be created before a Flink Job (using INSERT command) will start corectly. |
Flink SQL - 1.19.x vs 1.20.x | This repo uses Flink SQL 1.20.1. Here are some key differences from the last stable version: - 1.20.x supports "ARRAY_AGG" key word making grouping a list of objects easy - 1.20.1 JDBC and MongoDB connectors are not yet available. Keeping an eye on this to connect MySQL and MongoDB from Flink SQL client |
Redpanda vs Kafka (Dockerized) | - Redpanda containers start up quicker and offers seamless "kafka protocol" support for the applications and Flink SQL. - It was fun switching between Confluent Kafka and Redpanda as I experimented with the running demo. |