Skip to content

Commit 1eda0fb

Browse files
authored
Confluent Cloud custom connector tutorial (#1553)
1 parent 450a9a1 commit 1eda0fb

24 files changed

+444
-2
lines changed
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
dev:
2+
steps:
3+
- title: Initialize the project
4+
content:
5+
- action: execute
6+
file: tutorial-steps/dev/init.sh
7+
render:
8+
file: tutorials/custom-connector/confluent/markup/dev/init.adoc
9+
10+
- title: Provision your Kafka cluster
11+
content:
12+
- action: skip
13+
render:
14+
file: tutorials/custom-connector/confluent/markup/dev/ccloud-supported-provider-region.adoc
15+
- action: skip
16+
render:
17+
file: shared/markup/ccloud/ccloud-setup-self.adoc
18+
19+
- title: Develop the custom connector
20+
content:
21+
- action: skip
22+
render:
23+
file: tutorials/custom-connector/confluent/markup/dev/write-connector.adoc
24+
25+
- title: Package the custom connector
26+
content:
27+
- action: skip
28+
render:
29+
file: tutorials/custom-connector/confluent/markup/dev/package-connector.adoc
30+
31+
- title: Add the custom connector plugin to Confluent Cloud
32+
content:
33+
- action: skip
34+
render:
35+
file: tutorials/custom-connector/confluent/markup/dev/add-connector-plugin.adoc
36+
37+
- title: Run the custom connector on Confluent Cloud
38+
content:
39+
- action: skip
40+
render:
41+
file: tutorials/custom-connector/confluent/markup/dev/provision-connector.adoc
42+
43+
- title: Monitor the custom connector on Confluent Cloud
44+
content:
45+
- action: skip
46+
render:
47+
file: tutorials/custom-connector/confluent/markup/dev/verify-connector.adoc
48+
49+
- title: Delete the custom connector
50+
content:
51+
- action: skip
52+
render:
53+
file: tutorials/custom-connector/confluent/markup/dev/delete-connector.adoc

_data/tutorials.yaml

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -206,8 +206,8 @@ aggregating-count:
206206
criteria?
207207
introduction: Suppose you have a topic with events that represent ticket sales for
208208
movies. In this tutorial, you'll see an example of 'groupby count' in Kafka Streams,
209-
ksqlDB, and Flink SQL. We'll write a program that calculates the total number of tickets
210-
sold per movie.
209+
ksqlDB, and Flink SQL. We'll write a program that calculates the total number
210+
of tickets sold per movie.
211211
status:
212212
ksql: enabled
213213
kstreams: enabled
@@ -358,6 +358,18 @@ generate-test-data-streams:
358358
ksql: enabled
359359
kstreams: disabled
360360
kafka: enabled
361+
custom-connector:
362+
title: Build and run a custom connector on Confluent Cloud
363+
meta-description: Build and run a custom connector on Confluent Cloud
364+
canonical: confluent
365+
slug: /custom-connector
366+
introduction: 'While Confluent Cloud offers many pre-built <a href=''https://docs.confluent.io/cloud/current/connectors/index.html#supported-connectors''>managed
367+
connectors</a>, you may also upload and run custom connectors on Confluent Cloud — either an existing open source connector that you''d like to use,
368+
or a connector that you''ve written. In this tutorial, we''ll write a simple source connector plugin, package it so that it can be uploaded to Confluent Cloud, and then run
369+
the connector. As a developer, you want to write code. Let Confluent Cloud do the rest.'
370+
question: How do I write, package, and run a custom connector on Confluent Cloud?
371+
status:
372+
confluent: enabled
361373
aggregating-average:
362374
title: Compute an average aggregation
363375
meta-description: compute an average aggregation like count or sum
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
tutorial-steps/dev/outputs/
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
STEPS_DIR := tutorial-steps
2+
DEV_OUTPUTS_DIR := $(STEPS_DIR)/dev/outputs
3+
TEMP_DIR := $(shell mktemp -d)
4+
SEQUENCE := "dev, test, prod, ccloud"
5+
6+
tutorial:
7+
harness-runner ../../../../../_data/harnesses/custom-connector/confluent.yml $(TEMP_DIR) $(SEQUENCE)
8+
reset
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--~
3+
~ Copyright 2023 Confluent Inc.
4+
~
5+
~ Licensed under the Apache License, Version 2.0 (the "License");
6+
~ you may not use this file except in compliance with the License.
7+
~ You may obtain a copy of the License at
8+
~
9+
~ http://www.apache.org/licenses/LICENSE-2.0
10+
~
11+
~ Unless required by applicable law or agreed to in writing, software
12+
~ distributed under the License is distributed on an "AS IS" BASIS,
13+
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
~ See the License for the specific language governing permissions and
15+
~ limitations under the License.
16+
~-->
17+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
18+
<modelVersion>4.0.0</modelVersion>
19+
20+
<groupId>io.confluent.developer</groupId>
21+
<artifactId>kafka-connect-counter</artifactId>
22+
<version>0.0.1-SNAPSHOT</version>
23+
<packaging>jar</packaging>
24+
25+
<properties>
26+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
27+
<maven.compiler.source>1.8</maven.compiler.source>
28+
<maven.compiler.target>1.8</maven.compiler.target>
29+
</properties>
30+
31+
<name>kafka-connect-counter</name>
32+
33+
<description>
34+
A dummy Kafka Connect source connector that emits events containing incrementing numbers
35+
</description>
36+
37+
<dependencies>
38+
<dependency>
39+
<groupId>org.apache.kafka</groupId>
40+
<artifactId>connect-api</artifactId>
41+
<version>3.4.0</version>
42+
</dependency>
43+
44+
<!-- logging -->
45+
<dependency>
46+
<groupId>org.slf4j</groupId>
47+
<artifactId>slf4j-api</artifactId>
48+
<version>2.0.7</version>
49+
</dependency>
50+
<dependency>
51+
<groupId>org.slf4j</groupId>
52+
<artifactId>slf4j-reload4j</artifactId>
53+
<version>2.0.7</version>
54+
</dependency>
55+
</dependencies>
56+
57+
<build>
58+
<plugins>
59+
<plugin>
60+
<groupId>io.confluent</groupId>
61+
<version>0.11.1</version>
62+
<artifactId>kafka-connect-maven-plugin</artifactId>
63+
<executions>
64+
<execution>
65+
<goals>
66+
<goal>kafka-connect</goal>
67+
</goals>
68+
<configuration>
69+
<title>Counter Kafka Connector</title>
70+
<description>Demo connector that emits events with incrementing long values</description>
71+
<ownerUsername>confluentinc</ownerUsername>
72+
<ownerType>organization</ownerType>
73+
<ownerName>Confluent, Inc.</ownerName>
74+
<ownerUrl>https://confluent.io/</ownerUrl>
75+
<componentTypes>
76+
<componentType>source</componentType>
77+
</componentTypes>
78+
<confluentControlCenterIntegration>true
79+
</confluentControlCenterIntegration>
80+
<singleMessageTransforms>false
81+
</singleMessageTransforms>
82+
<supportedEncodings>UTF-8</supportedEncodings>
83+
</configuration>
84+
</execution>
85+
</executions>
86+
</plugin>
87+
</plugins>
88+
</build>
89+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package io.confluent.developer.connect;
2+
3+
import org.apache.kafka.common.config.ConfigDef;
4+
import org.apache.kafka.connect.connector.Task;
5+
import org.apache.kafka.connect.source.SourceConnector;
6+
7+
import java.util.ArrayList;
8+
import java.util.HashMap;
9+
import java.util.List;
10+
import java.util.Map;
11+
12+
public class CounterConnector extends SourceConnector {
13+
14+
private Map<String, String> props;
15+
16+
@Override
17+
public String version() {
18+
return CounterConnector.class.getPackage().getImplementationVersion();
19+
}
20+
21+
@Override
22+
public void start(Map<String, String> props) {
23+
this.props = props;
24+
}
25+
26+
@Override
27+
public void stop() {
28+
}
29+
30+
@Override
31+
public Class<? extends Task> taskClass() {
32+
return CounterSourceTask.class;
33+
}
34+
35+
@Override
36+
public List<Map<String, String>> taskConfigs(int maxTasks) {
37+
List<Map<String, String>> taskConfigs = new ArrayList<>();
38+
for (int i = 0; i < maxTasks; i++) {
39+
Map<String, String> taskConfig = new HashMap<>(this.props);
40+
taskConfig.put(CounterSourceTask.TASK_ID, Integer.toString(i));
41+
taskConfigs.add(taskConfig);
42+
}
43+
return taskConfigs;
44+
}
45+
46+
@Override
47+
public ConfigDef config() {
48+
return CounterConnectorConfig.conf();
49+
}
50+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package io.confluent.developer.connect;
2+
3+
import org.apache.kafka.common.config.AbstractConfig;
4+
import org.apache.kafka.common.config.ConfigDef;
5+
import org.apache.kafka.common.config.ConfigDef.Importance;
6+
import org.apache.kafka.common.config.ConfigDef.Type;
7+
8+
import java.util.Map;
9+
10+
public class CounterConnectorConfig extends AbstractConfig {
11+
12+
public static final String KAFKA_TOPIC_CONF = "kafka.topic";
13+
private static final String KAFKA_TOPIC_DOC = "Topic to write to";
14+
public static final String INTERVAL_CONF = "interval.ms";
15+
private static final String INTERVAL_DOC = "Interval between messages (ms)";
16+
17+
public CounterConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
18+
super(config, parsedConfig);
19+
}
20+
21+
public CounterConnectorConfig(Map<String, String> parsedConfig) {
22+
this(conf(), parsedConfig);
23+
}
24+
25+
public static ConfigDef conf() {
26+
return new ConfigDef()
27+
.define(KAFKA_TOPIC_CONF, Type.STRING, Importance.HIGH, KAFKA_TOPIC_DOC)
28+
.define(INTERVAL_CONF, Type.LONG, 1_000L, Importance.HIGH, INTERVAL_DOC);
29+
}
30+
31+
public String getKafkaTopic() {
32+
return this.getString(KAFKA_TOPIC_CONF);
33+
}
34+
35+
public Long getInterval() {
36+
return this.getLong(INTERVAL_CONF);
37+
}
38+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package io.confluent.developer.connect;
2+
3+
import org.apache.kafka.connect.data.Schema;
4+
import org.apache.kafka.connect.source.SourceRecord;
5+
import org.apache.kafka.connect.source.SourceTask;
6+
7+
import java.util.Collections;
8+
import java.util.List;
9+
import java.util.Map;
10+
11+
public class CounterSourceTask extends SourceTask {
12+
13+
public static final String TASK_ID = "task.id";
14+
public static final String CURRENT_ITERATION = "current.iteration";
15+
16+
private CounterConnectorConfig config;
17+
private String topic;
18+
private long interval;
19+
private long count = 0L;
20+
private int taskId;
21+
private Map sourcePartition;
22+
23+
@Override
24+
public String version() {
25+
return CounterSourceTask.class.getPackage().getImplementationVersion();
26+
}
27+
28+
@Override
29+
public void start(Map<String, String> props) {
30+
config = new CounterConnectorConfig(props);
31+
topic = config.getKafkaTopic();
32+
interval = config.getInterval();
33+
taskId = Integer.parseInt(props.get(TASK_ID));
34+
sourcePartition = Collections.singletonMap(TASK_ID, taskId);
35+
36+
Map<String, Object> offset = context.offsetStorageReader().offset(sourcePartition);
37+
if (offset != null) {
38+
// the offset contains our next state, so restore it as is
39+
count = ((Long) offset.get(CURRENT_ITERATION));
40+
}
41+
}
42+
43+
@Override
44+
public List<SourceRecord> poll() throws InterruptedException {
45+
46+
if (interval > 0) {
47+
try {
48+
Thread.sleep(interval);
49+
} catch (InterruptedException e) {
50+
Thread.interrupted();
51+
return null;
52+
}
53+
}
54+
55+
Map sourceOffset = Collections.singletonMap(CURRENT_ITERATION, count + 1);
56+
57+
final List<SourceRecord> records = Collections.singletonList(new SourceRecord(
58+
sourcePartition,
59+
sourceOffset,
60+
topic,
61+
null,
62+
Schema.INT64_SCHEMA,
63+
count
64+
));
65+
count++;
66+
return records;
67+
}
68+
69+
@Override
70+
public void stop() {
71+
}
72+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
mkdir -p custom-connector/src/main/java/io/confluent/developer/connect/ && cd custom-connector
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
mkdir -p src/main/java/io/confluent/developer/connect

0 commit comments

Comments
 (0)