Skip to content

Commit 3d05c5c

Browse files
a-serebryanskiyanton-bobkovblinkov
authored
Add examples and update constraints for Kafka API (#13492)
Co-authored-by: anton-bobkov <anton-bobkov@ydb.tech> Co-authored-by: Ivan Blinkov <ivan@ydb.tech>
1 parent ccd7d90 commit 3d05c5c

35 files changed

+642
-417
lines changed
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
2+
```bash
3+
kafka-console-consumer --bootstrap-server localhost:9092 \
4+
--topic my-topic \
5+
--group my-group \
6+
--from-beginning \
7+
--consumer-property check.crcs=false \
8+
--consumer-property partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
9+
```
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
2+
```bash
3+
kafka-console-consumer --bootstrap-server localhost:9092 \
4+
--topic <topic-name> \
5+
--group <consumer-name> \
6+
--from-beginning \
7+
--consumer-property check.crcs=false \
8+
--consumer-property partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor \
9+
--consumer-property security.protocol=SASL_PLAINTEXT \
10+
--consumer-property sasl.mechanism=PLAIN \
11+
--consumer-property "sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<username>\" password=\"<password>\";"
12+
```
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
2+
```bash
3+
kafka-console-producer --broker-list localhost:9092 --topic my-topic
4+
```
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
2+
```bash
3+
kcat -C \
4+
-b <ydb-endpoint> \
5+
-X check.crcs=false \
6+
-X partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor \
7+
-G <consumer-name> <topic-name>
8+
```
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
2+
```bash
3+
kcat -C \
4+
-b localhost:9092 \
5+
-X security.protocol=SASL_PLAINTEXT \
6+
-X sasl.mechanism=PLAIN \
7+
-X sasl.username="<username>" \
8+
-X sasl.password="<password>" \
9+
-X check.crcs=false \
10+
-X partition.assignment.strategy=roundrobin \
11+
-G <consumer-name> <topic-name>
12+
```
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
2+
```bash
3+
echo "test message" | kcat -P \
4+
-b <ydb-endpoint> \
5+
-t <topic-name> \
6+
-k key
7+
```
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
2+
```java
3+
public class YdbKafkaApiReadExample {
4+
5+
public static void main(String[] args) throws Exception {
6+
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
7+
.enableCheckpointing(5000, CheckpointingMode.AT_LEAST_ONCE);
8+
9+
Configuration config = new Configuration();
10+
config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
11+
config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "file:///path/to/your/checkpoints");
12+
env.configure(config);
13+
14+
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
15+
.setBootstrapServers("localhost:9092")
16+
.setProperty(ConsumerConfig.CHECK_CRCS_CONFIG, "false")
17+
.setGroupId("flink-demo-consumer")
18+
.setTopics("my-topic")
19+
.setStartingOffsets(OffsetsInitializer.earliest())
20+
.setBounded(OffsetsInitializer.latest())
21+
.setValueOnlyDeserializer(new SimpleStringSchema())
22+
.build();
23+
24+
env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka-source").print();
25+
26+
env.execute("YDB Kafka API example read app");
27+
}
28+
}
29+
```
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
2+
```java
3+
public class YdbKafkaApiProduceExample {
4+
private static final String TOPIC = "my-topic";
5+
6+
public static void main(String[] args) throws Exception {
7+
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
8+
9+
Sink<String> kafkaSink = KafkaSink.<String>builder()
10+
.setBootstrapServers("localhost:9092") // assuming ydb is running locally with kafka proxy on 9092 port
11+
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
12+
.setTopic(TOPIC)
13+
.setValueSerializationSchema(new SimpleStringSchema())
14+
.setKeySerializationSchema(new SimpleStringSchema())
15+
.build())
16+
.setRecordSerializer((el, ctx, ts) -> new ProducerRecord<>(TOPIC, el.getBytes()))
17+
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
18+
.build();
19+
20+
env.setParallelism(1)
21+
.fromSequence(0, 10)
22+
.map(i -> i + "")
23+
.sinkTo(kafkaSink);
24+
25+
// Execute program, beginning computation.
26+
env.execute("ydb_kafka_api_write_example");
27+
}
28+
}
29+
```
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
2+
```java
3+
String HOST = "<ydb-endpoint>";
4+
String TOPIC = "<topic-name>";
5+
String CONSUMER = "<consumer-name>";
6+
7+
Properties props = new Properties();
8+
9+
props.put("bootstrap.servers", HOST);
10+
11+
props.put("key.deserializer", StringDeserializer.class.getName());
12+
props.put("value.deserializer", StringDeserializer.class.getName());
13+
14+
props.put("check.crcs", false);
15+
props.put("partition.assignment.strategy", RoundRobinAssignor.class.getName());
16+
17+
props.put("group.id", CONSUMER);
18+
Consumer<String, String> consumer = new KafkaConsumer<>(props);
19+
consumer.subscribe(Arrays.asList(new String[] {TOPIC}));
20+
21+
while (true) {
22+
ConsumerRecords<String, String> records = consumer.poll(10000); // timeout 10 sec
23+
for (ConsumerRecord<String, String> record : records) {
24+
System.out.println(record.key() + ":" + record.value());
25+
}
26+
}
27+
```
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
2+
```java
3+
String TOPIC = "<topic-name>";
4+
String CONSUMER = "<consumer-name>";
5+
6+
Properties props = new Properties();
7+
8+
props.put("bootstrap.servers", "localhost:9092");
9+
10+
props.put("key.deserializer", StringDeserializer.class.getName());
11+
props.put("value.deserializer", StringDeserializer.class.getName());
12+
13+
props.put("check.crcs", false);
14+
props.put("partition.assignment.strategy", RoundRobinAssignor.class.getName());
15+
16+
props.put("security.protocol", "SASL_PLAINTEXT");
17+
props.put("sasl.mechanism", "PLAIN");
18+
props.put("sasl.jaas.config", "sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<username>\" password=\"<password>\";");
19+
20+
props.put("group.id", CONSUMER);
21+
Consumer<String, String> consumer = new KafkaConsumer<>(props);
22+
consumer.subscribe(Arrays.asList(new String[] {TOPIC}));
23+
24+
while (true) {
25+
ConsumerRecords<String, String> records = consumer.poll(10000); // timeout 10 sec
26+
for (ConsumerRecord<String, String> record : records) {
27+
System.out.println(record.key() + ":" + record.value());
28+
}
29+
}
30+
```

0 commit comments

Comments
 (0)