Skip to content

This is a demo application platform for a Snack Order Mgmt system that is implemented with CQRS using CDC (Debezium) & Kafka

Notifications You must be signed in to change notification settings

improving-minnesota/demo-ordermgmt-cqrs-cdc

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

50 Commits
 
 
 
 
 
 
 
 

Repository files navigation

Playground - DDD Aggregates w/ CDC and CQRS

Elevator Pitch

This repository "playground" delivers a solution design with demo-able code that is colored with a fun, fictional story providing a "magical journey from context to code using my grandkids love of snacks as inspiration to bring it all together".

Getting acquainted with this material will leave you a certified Solution Snackitect with an understanding of applying CDC, CQRS, DDD, Kafka Streams, and Flink SQL into an end-to-end solution.

What is in It?

This repository is packed with concepts, drawings, slides, and code related to a working "playground" building DDD Aggregates via a CDC-CQRS data stream.

The Presentation Slides outline a solution journey from story (requirements) to solution (architecture & design) to code. The goal of this demo is to give context to code and provide a pragmatic approach for designing sofware using DDD as a key driver.

This demo highlights a CDC-CQRS pipeline between a normalized relational database, MySQL, as the command database and a de-normalized NoSQL database, MongoDB, as the query database resulting in the creation of DDD Aggregates via Debezium & Kafka-Streams.

The Snacks Unlimited "Store" source code is centered around three microservices:

  • snack-order-commands
  • snack-order-processor
  • snack-customer-orders

These services are implemented as Spring-Boot applications in Java. The snack-order-commands exposes API REST endpoints which persist item-details, shipping-info, and payment in their respective tables on MySQL database.

Debezium tails the MySQL bin logs to capture any events in both these tables and publishes messages to Kafka topics.

These topics are consumed by snack-order-processor which is a Kafka-Streams application that joins data from these topics to create an Order-Aggregate (CustomerOrder) object which is then published to a customer-order-aggregate topic.

NOTE: The snack-order-processor is a Kafka-Streams application. The source code also includes a Flink SQL Job that can be used as an alternative to Kafka-Streams. The Flink SQL job defined in snack-order-flinksql also joins data to create a Customer Order Aggregate that is inserted into the customer-order-aggregate topic.

This customer-order-aggregate topic is consumed by MongoDB Sink Connector and the data is persisted in MongoDB which is served by snack-customer-ordersservice.


The Snacks Unlimited "Store" source code also includes a Next.js frontend application called snickers-promotion-app. This is a simple web application that submits order commands for a Snickers. It is used for demo purposes.


References

The Story

Snacks Unlimited Order Management System.

Below is a break down of a fictitious but relatable problem and the solution journey to building a scalable, flexible system using CQRS with CDC and Kadka.

Problem

Domain Storytelling

Event Blueprinting

Solution

DDD Context Map

System Design: Righting Software - "The Method"

C4 Modeling - Context View

C4 Modeling - Container View

Implementation: Kafka Streams Topology

Implementation: Flink SQL Job Overview


Playground Environment

Kafka-backed Playground

Redpanda-backed Playground

Warpstream-backed Playground

Spinning up Playground "Platform"

The "platform" consists of:

  • Confluent Platform (Docker) - Kafka, Kafka Connect
  • Apache Flink - SQL Client
  • Kafka Connectors for MySQL source and MongoDB sink
  • MySQL and "Adminer" (manage MySQL via browser)
  • MongoDB and Mongo Express (manage Mongo via browser)

The steps below can be executed from ./workspace directory.

from <ROOT> directory of this repository
$ cd ./workspace

Start and Stop the Platform

$ ./platform kafka|redpanda|warpstream start
$ ./platform kafka|redpanda|warpstream stop

Create the MySQL source and MongoDB sink Kafka Connectors

*NOTE: platform containers must be started and ready. *NOTE: platform containers must be started and ready.

$ ./platform init-connectors
Creating MySQL SourceConnector...
201
Creating MongoDB Sink Connector...
201

Spinning up Playground "Apps"

Build the Apps

$ ./apps build
$ ./docker-clean

Start the Apps with Kafka Streams Processing

This will start the following Spring Boot applications:

  • snack-order-commands (Write / MySQL)
  • snack-customer-orders (Read / MongoDB)
  • snack-order-processor (Anti-corruption / Kafka Streams)
$ ./apps start --kstreams

Start the Apps with FlinkSQL Processing

This will start the following Spring Boot applications:

  • snack-order-commands (Write / MySQL)
  • snack-customer-orders (Read / MongoDB)
  • FlinkSQL: streaming job with INSERT command
$ ./apps start --flinksql

Stop the Apps

$ ./apps stop

Run FlinkSQL Command-Line

$ ./apps cli flinksql

Stop and Clean the Playground

The following commands will stop the docker containers and perform a docker cleanup to remove/prune any dangling docker images, containers, and volumes.

$ ./apps stop
$ ./platform kafka|redpanda stop
$ ./docker-clean

DEMO

PRE: Start Platform and Apps

-- Demo with Kafka Streams

$ cd .workspace
$ ./platform redpanda|kafka start
$ ./platform init-connectors
Creating MySQL SourceConnector...
201
Creating MongoDB Sink Connector...
201
$ ./apps start --kstreams
-- Demo with Flink SQL

$ cd .workspace
$ ./platform redpanda|kafka start
$ ./apps start --flinksql

POST: Stop Platform and Apps

$ ./apps stop
$ ./platform kafka|redpanda stop
$ ./docker-clean

DEMO: API-Driven Demo (POSTMAN)

See Snacks Unlimited - Postman Collection.

--- DEMO START ---

  • 1 - Create Kafka Connectors

    • POST Create MySQL CDC Connector
    • POST Create MongoDB Sink Connector
  • 2a - Brady's Order

    • POST Snack Items
    • POST Shipping Location
    • POST Payment
  • 2b -Oakley's Order

    • POST Snack Items
    • POST Shipping Location
    • POST Payment
  • 2c - Rylan's Order

    • POST Snack Items
    • POST Shipping Location
    • POST Payment
  • 3 - Fulfill Orders (Sam the Wizard)

    • GET PAID Orders
    • PUT Order Fulfillment
  • 4 - Ship Orders (Minions Delivery Team)

    • GET FULFILLED Orders
    • PUT Order Shipment

--- DEMO COMPLETE ---

DEMO: Snickers Promotion App

  • 1- Open Browser to http://{host}:3000

    • Click "WISH I had a Snickers" Screen 1
    • Enter your name and Click "HERE I am" Screen 1
    • Click "Complete WISH" Screen 1
    • Your Snickers Wish is Sent... Screen 1
  • 2 - Fulfill Orders (Sam the Wizard)

    • GET PAID Orders
    • PUT Order Fulfillment
  • 3 - Ship Orders (Minions Delivery Team)

    • GET FULFILLED Orders
    • PUT Order Shipment

Tools During Demo

  • Flink SQL Shell
-- Open Flink SQL Shell

$ cd .workspace
$ ./apps flinksql-cli

Flink SQL>
  • Flink Dashboard

  • Kafka CLI

    • List Kafka Topics
    $ kafka-topics \
        --bootstrap-server $BOOTSTRAP_SERVER \
        --command-config $CC_CONFIG \
        --list
    
  • Consume Kafka Topic Records

    $ kafka-console-consumer \
      --topic "$1" \
      --bootstrap-server $BOOTSTRAP_SERVER \
      --property "schema.registry.url=$SCHEMA_REGISTRY_URL" \
      --property "basic.auth.credentials.source=USER_INFO" \
      --property "basic.auth.user.info=$SCHEMA_AUTH" \
      --property "print.key=true" \
      --property "key.separator= ==> " \
      --consumer.config $CC_CONFIG \
      --consumer-property "group.id=$CONSUMER_GRP" \
      --from-beginning -- OPTIONAL
    
  • MySQL Database

  • MongoDB Database


From Playground to Arena (i.e. The Cloud)

Deploy on an AWS EC2 Instance

Single EC2 Instance Specs

The EC2 instance needs to have enough CPU, Memory and Storage to run the 12-14 Snacks Unlimited containers.

Based on local Docker Desktop stats the following EC2 instance will be a good fit for running the demo containers on AWS.

t3.large
- vCPUs: 2
- Memory: 8GB
- Storage: 30GB

Sandbox Setup Steps

Launch EC2 Instance

Go to EC2 Dashboard → Instances → Launch Instance

Name: demo-snacks-unlimited
AMI: Amazon Linux 2 AMI (HVM), SSD Volume Type

Instance type: t3.large
Key pair: Create a New Key pair `demo-snacks-unlimited`

Network settings:
- Default VPN
- Security Group: demo-snacks-unlimited-sg
- Auto Assign Public IP
- Allow SSH (port 22) from your IP
- Allow Next.js App (port 3000) from your IP
- Allow Snacks Unlimited APIs (port 8800-8802) from your IP

Storage: 30 GB

Launch

Connect to EC2

ssh -i /path/to/your-key.pem ec2-user@<public-ip>

Initial Setup Scripts

Install Git and Docker
$ sudo dnf update -y
$ sudo yum update -y
$ sudo yum install git -y

$ sudo dnf install -y docker
$ sudo systemctl enable docker
$ sudo systemctl start docker
$ sudo usermod -aG docker ec2-user

Test Docker
$ docker version
$ docker info

Install Docker Compose
$ mkdir -p ~/.docker/cli-plugins
$ curl -SL https://github.com/docker/compose/releases/download/v2.24.1/docker-compose-linux-x86_64 -o ~/.docker/cli-plugins/docker-compose
$ docker compose version

Install Java
$ sudo dnf install -y java-17-amazon-corretto

$ java --version
OpenJDK Runtime Environment Corretto-17.0.14.7.1 (build 17.0.14+7-LTS)
OpenJDK 64-Bit Server VM Corretto-17.0.14.7.1 (build 17.0.14+7-LTS, mixed mode, sharing)

Configure NGINX to Secure Next.js app with HTTPS

SSH into demo-snacks-unlimited EC2 instance.

Install nginx
$ sudo yum install nginx -y

Install Certbot (Let's Encrypt SSL tool)
$ sudo yum install certbot python3-certbot-nginx -y

Configure NGINX as reverse proxy. Edit /etc/nginx/nginx.conf and add location block:

server {
    listen 80;
    server_name {your-domain.com};

    location / {
        proxy_pass http://localhost:3000;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection 'upgrade';
        proxy_set_header Host $host;
        proxy_cache_bypass $http_upgrade;
    }
}

Test

$ sudo nginx -t
$ sudo systemctl enable nginx
$ sudo systemctl start nginx
$ sudo systemctl reload nginx

Get and Install SSL Cert

This only works IF you register a domain from a registrar such as Route53, Namecheap, GoDaddy, Google Domains.

$ sudo certbot --nginx -d your-domain.com -d {your-domain.com}

Clone Snacks Unlimited Demo

$ mkdir ~/demos
$ cd ~/demos
$ git clone https://github.com/improving-minnesota/demo-ordermgmt-cqrs-cdc.git snacks-unlimited-demo

Sandbox Platform & Apps

$ cd ~/demos/snacks-unlimited-demo/workspace

Build App Images

$ ./apps build
$ ./docker-clean

Startup the Platform and Apps

These are the combinations I found worked on this EC2 instance. Not sure why, but the Flink Job Manager had issues running the Flink SQL job with Redpanda as the broker. NOTE: All combinations worked fine on a Macbook using Docker Desktop. I suspect a network config but I have not looked into it.

Combo 1
$ ./platform redpanda start
$ ./apps start --kstreams

Combo 2
$ ./platform kafka start
$ ./apps start --kstreams

Combo 3
$ ./platform kafka start
$ ./apps start --flinksql

Combo 4 (failed)
$ ./platform redpanda start
$ ./apps start --flinksql

MySQL Source Connector and MongoDB Sink Connector
$ ./platform init-connectors

Sandbox: Start Instance after Stopping

After Instance is Started

When stopping the EC2 Instance, on startup the following commands need to run to startup the apps and platforms. SSH into the instance:

$ cd ~/demos/snacks-unlimited-demo/workspace

Clean up to be safe
$ ./platform redpanda|kafka stop
$ ./apps stop
$ ./docker-clean

Start the Platform and Apps 
(See startup combinations in previous section)

Monitor Orders w/ FlinkSQL Client

$ ./apps cli flinksql

// ALL customer order aggregates
Flink SQL> select orderId, orderStatus, shippingLocation.customerName from customer_order_aggregate;

// PAID customer order aggregates
Flink SQL> select orderId, orderStatus, shippingLocation.customerName from customer_order_aggregate where orderStatus = 'PAID';

Development Notes (Step-by-Step)

Build: snack-order-commands (Write)

This is a transactional service that triggers the overall CQRS workflow by initiating order commands:

  • POST order items (api/item)
  • POST shipping details (api/shipping)
  • POST payment details (api/payment)

Step 1: Create a Spring Boot project with Gradle, Web, JPA, MySQL

$ spring init -d=web,jpa,mysql --build=gradle --type=gradle-project-kotlin snack-order-commands

Create the base packages:

  • api.rest
  • data
  • exception
  • service

Step 2: Create the Skeleton OrderController (REST endpoints)

  • see src/main/java/api/rest/OrderWriteController.java

Step 3: Build Components, Tests, Config

Build: snack-order-processor (DDD Aggregate)

This service consumes from the CDC Topics (triggered from order-write-service) and joins them into an Order Aggregrate for reading.

Step 1: Create the Spring Boot Base Project

$ spring init -d=web --build=gradle --type=gradle-project-kotlin snack-order-processor

Add in the Kafka dependencies in build.gradle.kts:

implementation("org.springframework.kafka:spring-kafka")
implementation("org.apache.kafka:kafka-streams")

testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("org.springframework.kafka:spring-kafka-test")

Create the base packages:

  • config (Kafka Configuration)
  • model.domain
  • model.inbound
  • stream (Kafka Stream topology and Serdes configuration)

Step 2: Create Skeleton Model

Step 3: Configure Kafka and Streams

Step 4: Build Out Components, Tests

Step 5: Setup Kafka Streams Topology Visualization

  • Add the following line to OrderAggregateStream:
See TopologyController class
  • (OPTIONAL) Add this to application.yml to enable actuator endpoints.
management:
  endpoints:å
    web:
      exposure:
        include: "*"

Build: snack-customer-orders (Read)

This is a query service that reads from a MongoDB datastore and represents the read segregated responsibilities of CQRS.

Query commands:

  • GET api/orders/{orderId}

Step 1: Create a Spring Boot project with Gradle, Web, MongoDB

$ spring init -d=web,data-mongodb --build=gradle --type=gradle-project-kotlin snack-customer-orders

Create the base packages:

  • api.rest
  • api.model
  • data.repository
  • service

Step 2: Create the Skeleton OrderController (REST endpoints)

  • see src/main/java/api/rest/OrderWriteController.java

Step 3: Build Components, Tests, Config

Setup: Connectors

Step 1: Debezium CDC Connector

With the Platform Running (See Above).

POST http://localhost:8083/connectors
--
{
    "name": "order-command-db-connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "topic.prefix": "order-command-server",
        "database.hostname": "mysql_db_server",
        "database.port": "3306",
        "database.user": "order-command-user",
        "database.password": "password",
        "database.server.id": "142401",
        "database.server.name": "order-command-server",
        "database.whitelist": "order-command-db",
        "table.whitelist": "order-command-db.payment, order-command-db.shipping_location,order-command-db.item_detail",
        "schema.history.internal.kafka.bootstrap.servers": "broker:29092",
        "schema.history.internal.kafka.topic": "dbhistory.order-command-db",
        "include.schema.changes": "true",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
    }
}

Step 2: MongoDB Sink Connector

With the Platform Running (See Above).

POST http://localhost:8083/connectors
--

{
    "name": "order-app-mongo-sink-connector",
    "config": {
        "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
        "topics": "customer-order-aggregate",
        "connection.uri": "mongodb://mongo-user:password@mongodb_server:27017",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": false,
        "database": "customer_order_db",
        "collection": "customerOrder",
        "document.id.strategy.overwrite.existing": "true",
        "document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInKeyStrategy",
        "transforms": "hk,hv",
        "transforms.hk.type": "org.apache.kafka.connect.transforms.HoistField$Key",
        "transforms.hk.field": "_id",
        "transforms.hv.type": "org.apache.kafka.connect.transforms.HoistField$Value",
        "transforms.hv.field": "customerOrder"
    }
}


Development Troubleshooting Notes

UnsatisfiedDependencyException

Iorg.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'dataSourceScriptDatabaseInitializer'

This is due to the JPA Data Source not being configured correctly. This can be fixed in the following way:

  • Add the data source configuration to application.yml.
spring:
  datasource:
    url: jdbc:mysql://localhost:13306/order-command-db
    username: order-command-user
    password: password
    driver-class-name: com.mysql.cj.jdbc.Driver

  jpa:
    generate-ddl: true
    show-sql: true

    hibernate:
      ddl-auto: update
      show-sql: true
    properties:
      hibernate:
        format_sql: true
        dialect: org.hibernate.dialect.MySQLDialect

Dependency error configuring in-memory database for @DataJpaTest

Add this to your build.gradle.kts:

testImplementation("com.h2database:h2")

DDL error when running @DataJpaTest

This annotation starts up an in-memory database to run the JPA repository tests.

In application.yml you do not need this:

#        dialect: org.hibernate.dialect.MySQLDialect

Debezium MySQLConnector Error: Access denied; you need the SUPER, RELOAD, or FLUSH_TABLES privilege for this operation

Need to grant privileges for the Debezium MySQLConnector

 GRANT ALL PRIVILEGES ON *.* TO 'order-command-user';

My Discoveries (What I Learned)

I learned alot so this is not an exhuastive list.

Item Discovery Notes
Using @DataJpaTest for JPA Repository Tests This annotation bootstraps the test with an in-memory H2 SQL database.
Leverage Spring profiles for ./gradlew bootRun, ./apps start (local-docker), local-embedded for testing See _src/main/resources/application.yml
Access privileges for CDC Connector Check out ./docker/mysql/init/init-db.sql to grant privileges to the order-command-user.
Flink SQL Client - tables (connectors) are created in memory and are "gone" after the sessions is closed
- The Kafka topics the tables connect to must be created before a Flink Job (using INSERT command) will start corectly.
Flink SQL - 1.19.x vs 1.20.x This repo uses Flink SQL 1.20.1. Here are some key differences from the last stable version:
- 1.20.x supports "ARRAY_AGG" key word making grouping a list of objects easy
- 1.20.1 JDBC and MongoDB connectors are not yet available. Keeping an eye on this to connect MySQL and MongoDB from Flink SQL client
Redpanda vs Kafka (Dockerized) - Redpanda containers start up quicker and offers seamless "kafka protocol" support for the applications and Flink SQL.
- It was fun switching between Confluent Kafka and Redpanda as I experimented with the running demo.

About

This is a demo application platform for a Snack Order Mgmt system that is implemented with CQRS using CDC (Debezium) & Kafka

Resources

Stars

Watchers

Forks

Packages

No packages published