Skip to content

Commit caa1d6b

Browse files
committed
👾 Add Azure Cosmos DB Sink V2 Connector fully managed example #6608
1 parent 1f1314e commit caa1d6b

File tree

8 files changed

+210
-1
lines changed

8 files changed

+210
-1
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ jobs:
9696
# requiring ngrok
9797
"🚀1️⃣ ccloud/fm-debezium-mysql-legacy-source ccloud/fm-debezium-postgresql-legacy-source ccloud/fm-debezium-mysql-v2-source ccloud/fm-debezium-sqlserver-legacy-source ccloud/fm-debezium-sqlserver-v2-source ccloud/fm-elasticsearch-sink ccloud/fm-jdbc-sqlserver-sink ccloud/fm-jdbc-sqlserver-source ccloud/fm-ibm-mq-source ccloud/fm-jdbc-mysql-source ccloud/fm-jdbc-postgresql-sink ccloud/fm-mqtt-source ccloud/fm-debezium-postgresql-v2-source ccloud/fm-jdbc-postgresql-source",
9898

99-
"🚀 ccloud/fm-aws-cloudwatch-logs-source ccloud/fm-aws-s3-sink ccloud/fm-aws-s3-source ccloud/fm-azure-event-hubs-source ccloud/fm-azure-service-bus-source ccloud/fm-databricks-delta-lake-sink ccloud/fm-gcp-gcs-sink ccloud/fm-gcp-gcs-source ccloud/fm-aws-dynamodb-sink ccloud/fm-azure-data-lake-storage-gen2-sink ccloud/fm-azure-functions-sink ccloud/fm-azure-synapse-analytics-sink ccloud/fm-azure-cosmosdb-sink ccloud/fm-azure-cosmosdb-source ccloud/fm-azure-cosmosdb-v2-source ccloud/fully-managed-datadog-metrics-sink",
99+
"🚀 ccloud/fm-aws-cloudwatch-logs-source ccloud/fm-aws-s3-sink ccloud/fm-aws-s3-source ccloud/fm-azure-event-hubs-source ccloud/fm-azure-service-bus-source ccloud/fm-databricks-delta-lake-sink ccloud/fm-gcp-gcs-sink ccloud/fm-gcp-gcs-source ccloud/fm-aws-dynamodb-sink ccloud/fm-azure-data-lake-storage-gen2-sink ccloud/fm-azure-functions-sink ccloud/fm-azure-synapse-analytics-sink ccloud/fm-azure-cosmosdb-sink ccloud/fm-azure-cosmosdb-v2-sink ccloud/fm-azure-cosmosdb-source ccloud/fm-azure-cosmosdb-v2-source ccloud/fully-managed-datadog-metrics-sink",
100100

101101
"🚀 ccloud/fm-salesforce-cdc-source ccloud/fm-salesforce-platform-events-sink ccloud/fm-salesforce-pushtopics-source ccloud/fm-salesforce-sobject-sink ccloud/fm-snowflake-sink ccloud/fm-snowflake-source ccloud/fm-gcp-pubsub-source ccloud/fm-gcp-spanner-sink ccloud/fm-aws-cloudwatch-metrics-sink ccloud/fm-gcp-bigquery-legacy-sink ccloud/fm-gcp-bigquery-v2-sink ccloud/fm-github-source ccloud/fm-jira-source ccloud/fm-gcp-cloud-functions-legacy-sink ccloud/fm-gcp-cloud-functions-gen2-sink ccloud/fm-gcp-bigtable-sink ccloud/fm-pagerduty-sink ccloud/fm-azure-log-analytics-sink",
102102

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# Fully Managed Azure Cosmos DB V2 Sink connector
2+
3+
4+
5+
## Objective
6+
7+
Quickly test [Fully Managed Azure Cosmos DB V2 Sink](https://docs.confluent.io/cloud/current/connectors/cc-azure-cosmos-sink-v2.html) connector.
8+
9+
## Prerequisites
10+
11+
See [here](https://kafka-docker-playground.io/#/how-to-use?id=%f0%9f%8c%a4%ef%b8%8f-confluent-cloud-examples)
12+
13+
14+
## How to run
15+
16+
Simply run:
17+
18+
```
19+
$ just use <playground run> command
20+
```
21+
22+
Note if you have multiple [Azure subscriptions](https://github.com/MicrosoftDocs/azure-docs-cli/blob/main/docs-ref-conceptual/manage-azure-subscriptions-azure-cli.md#change-the-active-subscription) make sure to set `AZURE_SUBSCRIPTION_NAME` environment variable to create Azure resource group in correct subscription (for confluent support, subscription is `COPS`).
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
FROM python:3
2+
3+
ADD get-data.py /
4+
5+
RUN pip install azure-cosmos
6+
7+
CMD sleep infinity
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
from azure.cosmos import CosmosClient
2+
3+
import os
4+
5+
url = os.environ['AZURE_COSMOSDB_DB_ENDPOINT_URI']
6+
key = os.environ['AZURE_COSMOSDB_PRIMARY_CONNECTION_KEY']
7+
database_name = os.environ['AZURE_COSMOSDB_DB_NAME']
8+
container_name = os.environ['AZURE_COSMOSDB_CONTAINER_NAME']
9+
client = CosmosClient(url, credential=key)
10+
database = client.get_database_client(database_name)
11+
container = database.get_container_client(container_name)
12+
13+
# Enumerate the returned items
14+
import json
15+
for item in container.query_items(
16+
query='SELECT * FROM '+container_name+' r',
17+
enable_cross_partition_query=True):
18+
print(json.dumps(item, indent=True))
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
---
2+
services:
3+
# https://pypi.org/project/azure-cosmos/
4+
azure-cosmos-client:
5+
build:
6+
context: ../../ccloud/fm-azure-cosmosdb-sink/azure-cosmos-client
7+
hostname: azure-cosmos-client
8+
container_name: azure-cosmos-client
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
#!/bin/bash
2+
set -e
3+
4+
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd )"
5+
source ${DIR}/../../scripts/utils.sh
6+
7+
login_and_maybe_set_azure_subscription
8+
9+
10+
bootstrap_ccloud_environment
11+
12+
set +e
13+
playground topic delete --topic hotels
14+
sleep 3
15+
playground topic create --topic hotels --nb-partitions 1
16+
set -e
17+
18+
19+
# https://github.com/microsoft/kafka-connect-cosmosdb/blob/dev/doc/CosmosDB_Setup.md
20+
AZURE_NAME=pgfm${USER}cs${GITHUB_RUN_NUMBER}${TAG}
21+
AZURE_NAME=${AZURE_NAME//[-._]/}
22+
if [ ${#AZURE_NAME} -gt 24 ]; then
23+
AZURE_NAME=${AZURE_NAME:0:24}
24+
fi
25+
AZURE_REGION=westeurope
26+
AZURE_RESOURCE_GROUP=$AZURE_NAME
27+
AZURE_COSMOSDB_SERVER_NAME=$AZURE_NAME
28+
AZURE_COSMOSDB_DB_NAME=$AZURE_NAME
29+
AZURE_COSMOSDB_CONTAINER_NAME=$AZURE_NAME
30+
31+
set +e
32+
log "Delete Cosmos DB instance and resource group (it might fail)"
33+
az cosmosdb delete -g $AZURE_RESOURCE_GROUP -n $AZURE_COSMOSDB_SERVER_NAME --yes
34+
az group delete --name $AZURE_RESOURCE_GROUP --yes
35+
set -e
36+
37+
log "Creating Azure Resource Group $AZURE_RESOURCE_GROUP"
38+
az group create \
39+
--name $AZURE_RESOURCE_GROUP \
40+
--location $AZURE_REGION \
41+
--tags owner_email=$AZ_USER
42+
43+
sleep 10
44+
45+
log "Creating Cosmos DB server $AZURE_COSMOSDB_SERVER_NAME"
46+
az cosmosdb create \
47+
--name $AZURE_COSMOSDB_SERVER_NAME \
48+
--resource-group $AZURE_RESOURCE_GROUP \
49+
--locations regionName=$AZURE_REGION
50+
51+
function cleanup_cloud_resources {
52+
set +e
53+
log "Delete Cosmos DB instance"
54+
check_if_continue
55+
az cosmosdb delete -g $AZURE_RESOURCE_GROUP -n $AZURE_COSMOSDB_SERVER_NAME --yes
56+
57+
log "Deleting resource group $AZURE_RESOURCE_GROUP"
58+
check_if_continue
59+
az group delete --name $AZURE_RESOURCE_GROUP --yes --no-wait
60+
}
61+
trap cleanup_cloud_resources EXIT
62+
63+
log "Create the database"
64+
az cosmosdb sql database create \
65+
--name $AZURE_COSMOSDB_DB_NAME \
66+
--resource-group $AZURE_RESOURCE_GROUP \
67+
--account-name $AZURE_COSMOSDB_SERVER_NAME \
68+
--throughput 400
69+
70+
log "Create the container"
71+
az cosmosdb sql container create \
72+
--name $AZURE_COSMOSDB_CONTAINER_NAME \
73+
--resource-group $AZURE_RESOURCE_GROUP \
74+
--account-name $AZURE_COSMOSDB_SERVER_NAME \
75+
--database-name $AZURE_COSMOSDB_DB_NAME \
76+
--partition-key-path "/id"
77+
78+
# With the Azure Cosmos DB instance setup, you will need to get the Cosmos DB endpoint URI and primary connection key. These values will be used to setup the Cosmos DB Source and Sink connectors.
79+
AZURE_COSMOSDB_DB_ENDPOINT_URI="https://${AZURE_COSMOSDB_DB_NAME}.documents.azure.com:443/"
80+
log "Cosmos DB endpoint URI is $AZURE_COSMOSDB_DB_ENDPOINT_URI"
81+
82+
# get Cosmos DB primary connection key
83+
AZURE_COSMOSDB_PRIMARY_CONNECTION_KEY=$(az cosmosdb keys list -n $AZURE_COSMOSDB_DB_NAME -g $AZURE_RESOURCE_GROUP --query primaryMasterKey -o tsv)
84+
85+
docker compose build
86+
docker compose down -v --remove-orphans
87+
docker compose up -d --quiet-pull
88+
89+
connector_name="CosmosDbSinkV2_$USER"
90+
set +e
91+
playground connector delete --connector $connector_name > /dev/null 2>&1
92+
set -e
93+
94+
log "Creating fully managed connector"
95+
playground connector create-or-update --connector $connector_name << EOF
96+
{
97+
"connector.class": "CosmosDbSinkV2",
98+
"name": "$connector_name",
99+
"kafka.auth.mode": "KAFKA_API_KEY",
100+
"kafka.api.key": "$CLOUD_KEY",
101+
"kafka.api.secret": "$CLOUD_SECRET",
102+
"input.data.format": "JSON",
103+
"azure.cosmos.account.endpoint": "$AZURE_COSMOSDB_DB_ENDPOINT_URI",
104+
"azure.cosmos.account.key": "$AZURE_COSMOSDB_PRIMARY_CONNECTION_KEY",
105+
"azure.cosmos.sink.database.name": "$AZURE_COSMOSDB_DB_NAME",
106+
"azure.cosmos.sink.containers.topicMap": "hotels#${AZURE_COSMOSDB_DB_NAME}",
107+
"azure.cosmos.sink.id.strategy": "FullKeyStrategy",
108+
"topics": "hotels",
109+
"tasks.max" : "1"
110+
}
111+
EOF
112+
wait_for_ccloud_connector_up $connector_name 180
113+
114+
115+
log "Write data to topic hotels"
116+
playground topic produce -t hotels --nb-messages 3 << 'EOF'
117+
{"id": "h1", "HotelName": "Marriott", "Description": "Marriott description"}
118+
{"id": "h2", "HotelName": "HolidayInn", "Description": "HolidayInn description"}
119+
{"id": "h3", "HotelName": "Motel8", "Description": "Motel8 description"}
120+
EOF
121+
122+
sleep 10
123+
124+
log "Verify data from Azure Cosmos DB"
125+
docker exec -e AZURE_COSMOSDB_DB_ENDPOINT_URI=$AZURE_COSMOSDB_DB_ENDPOINT_URI -e AZURE_COSMOSDB_PRIMARY_CONNECTION_KEY=$AZURE_COSMOSDB_PRIMARY_CONNECTION_KEY -e AZURE_COSMOSDB_DB_NAME=$AZURE_COSMOSDB_DB_NAME -e AZURE_COSMOSDB_CONTAINER_NAME=$AZURE_COSMOSDB_CONTAINER_NAME azure-cosmos-client bash -c "python /get-data.py" > /tmp/result.log 2>&1
126+
127+
cat /tmp/result.log
128+
grep "Motel8" /tmp/result.log
129+
130+
log "Do you want to delete the fully managed connector $connector_name ?"
131+
check_if_continue
132+
133+
playground connector delete --connector $connector_name
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
#!/bin/bash
2+
3+
4+
5+
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd )"
6+
source ${DIR}/../../scripts/utils.sh
7+
8+
docker compose down -v --remove-orphans
9+
10+
maybe_delete_ccloud_environment
11+
12+
AZURE_NAME=pg${USER}cs${GITHUB_RUN_NUMBER}${TAG}
13+
AZURE_NAME=${AZURE_NAME//[-._]/}
14+
if [ ${#AZURE_NAME} -gt 24 ]; then
15+
AZURE_NAME=${AZURE_NAME:0:24}
16+
fi
17+
AZURE_RESOURCE_GROUP=$AZURE_NAME
18+
19+
log "Deleting resource group"
20+
az group delete --name $AZURE_RESOURCE_GROUP --yes --no-wait

docs/content-template.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@
196196
- <img src="https://github.com/vdesabou/kafka-docker-playground/raw/master/images/icons/blob_storage.png" width="15"> [Azure Blob Storage Sink](https://github.com/vdesabou/kafka-docker-playground/tree/master/ccloud/fm-azure-blob-storage-sink) :ccloud/fm-azure-blob-storage-sink:
197197
- <img src="https://github.com/vdesabou/kafka-docker-playground/raw/master/images/icons/search.png" width="15"> [Azure Cognitive Search Sink](https://github.com/vdesabou/kafka-docker-playground/tree/master/ccloud/fm-azure-cognitive-search-sink) :ccloud/fm-azure-cognitive-search-sink:
198198
- <img src="https://github.com/vdesabou/kafka-docker-playground/raw/master/images/icons/cosmosdb.png" width="15"> [Azure Cosmos DB Sink](https://github.com/vdesabou/kafka-docker-playground/tree/master/ccloud/fm-azure-cosmosdb-sink) :ccloud/fm-azure-cosmosdb-sink:
199+
- <img src="https://github.com/vdesabou/kafka-docker-playground/raw/master/images/icons/cosmosdb.png" width="15"> [Azure Cosmos DB V2 Sink](https://github.com/vdesabou/kafka-docker-playground/tree/master/ccloud/fm-azure-cosmosdb-v2-sink) :ccloud/fm-azure-cosmosdb-v2-sink:
199200
- <img src="https://github.com/vdesabou/kafka-docker-playground/raw/master/images/icons/cosmosdb.png" width="15"> [Azure Cosmos DB Source](https://github.com/vdesabou/kafka-docker-playground/tree/master/ccloud/fm-azure-cosmosdb-source) :ccloud/fm-azure-cosmosdb-source:
200201
- <img src="https://github.com/vdesabou/kafka-docker-playground/raw/master/images/icons/cosmosdb.png" width="15"> [Azure Cosmos DB V2 Source](https://github.com/vdesabou/kafka-docker-playground/tree/master/ccloud/fm-azure-cosmosdb-v2-source) :ccloud/fm-azure-cosmosdb-v2-source:
201202
- <img src="https://github.com/vdesabou/kafka-docker-playground/raw/master/images/icons/data_lake_gen1.png" width="15"> [Azure Data Lake Storage Gen2 Sink](https://github.com/vdesabou/kafka-docker-playground/tree/master/ccloud/fm-azure-data-lake-storage-gen2-sink) :ccloud/fm-azure-data-lake-storage-gen2-sink:

0 commit comments

Comments
 (0)