Skip to content

Commit 071f9d7

Browse files
authored
Add suport for KeyValue in Pulsar (#627)
1 parent 0002679 commit 071f9d7

File tree

3 files changed

+117
-26
lines changed

3 files changed

+117
-26
lines changed

langstream-pulsar-runtime/src/main/java/ai/langstream/pulsar/runner/PulsarTopicConnectionsRuntimeProvider.java

Lines changed: 48 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,9 @@
7070
import org.apache.pulsar.client.api.Schema;
7171
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
7272
import org.apache.pulsar.client.api.SubscriptionType;
73+
import org.apache.pulsar.client.api.TypedMessageBuilder;
7374
import org.apache.pulsar.client.api.schema.GenericRecord;
75+
import org.apache.pulsar.client.api.schema.KeyValueSchema;
7476
import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo;
7577
import org.apache.pulsar.common.naming.TopicName;
7678
import org.apache.pulsar.common.policies.data.RetentionPolicies;
@@ -623,7 +625,12 @@ public void start() {
623625
mapper.convertValue(
624626
configuration.remove("keySchema"), SchemaDefinition.class);
625627
Schema<?> keySchema = Schema.getSchema(getSchemaInfo(keySchemaDefinition));
626-
schema = (Schema<K>) Schema.KeyValue(keySchema, valueSchema);
628+
schema =
629+
(Schema<K>)
630+
Schema.KeyValue(
631+
keySchema,
632+
valueSchema,
633+
KeyValueEncodingType.SEPARATED);
627634
} else {
628635
schema = (Schema<K>) valueSchema;
629636
}
@@ -668,7 +675,12 @@ public CompletableFuture<?> write(Record r) {
668675
Schema<?> valueSchema = getSchema(r.value().getClass());
669676
if (r.key() != null) {
670677
Schema<?> keySchema = getSchema(r.key().getClass());
671-
schema = (Schema<K>) Schema.KeyValue(keySchema, valueSchema);
678+
schema =
679+
(Schema<K>)
680+
Schema.KeyValue(
681+
keySchema,
682+
valueSchema,
683+
KeyValueEncodingType.SEPARATED);
672684
} else {
673685
schema = (Schema<K>) valueSchema;
674686
}
@@ -683,36 +695,48 @@ public CompletableFuture<?> write(Record r) {
683695
}
684696

685697
log.info("Writing message {}", r);
686-
// TODO: handle KV
687-
688-
return producer.newMessage()
689-
.key(r.key() != null ? r.key().toString() : null)
690-
.value(convertValue(r))
691-
.properties(
692-
r.headers().stream()
693-
.collect(
694-
Collectors.toMap(
695-
Header::key,
696-
h ->
697-
h.value() != null
698-
? h.value().toString()
699-
: null)))
700-
.sendAsync();
701-
}
702-
703-
private K convertValue(Record r) {
704-
Object value = r.value();
698+
699+
TypedMessageBuilder<K> message =
700+
producer.newMessage()
701+
.properties(
702+
r.headers().stream()
703+
.collect(
704+
Collectors.toMap(
705+
Header::key,
706+
h ->
707+
h.value() != null
708+
? h.value()
709+
.toString()
710+
: null)));
711+
712+
if (schema instanceof KeyValueSchema<?, ?> keyValueSchema) {
713+
KeyValue<?, ?> keyValue =
714+
new KeyValue<>(
715+
convertValue(r.key(), keyValueSchema.getKeySchema()),
716+
convertValue(r.value(), keyValueSchema.getValueSchema()));
717+
message.value((K) keyValue);
718+
} else {
719+
if (r.key() != null) {
720+
message.key(r.key().toString());
721+
}
722+
message.value((K) convertValue(r.value(), schema));
723+
}
724+
725+
return message.sendAsync();
726+
}
727+
728+
private Object convertValue(Object value, Schema<?> schema) {
705729
if (value == null) {
706730
return null;
707731
}
708732
switch (schema.getSchemaInfo().getType()) {
709733
case BYTES:
710734
if (value instanceof byte[]) {
711-
return (K) value;
735+
return value;
712736
}
713-
return (K) value.toString().getBytes(StandardCharsets.UTF_8);
737+
return value.toString().getBytes(StandardCharsets.UTF_8);
714738
case STRING:
715-
return (K) value.toString();
739+
return value.toString();
716740
default:
717741
throw new IllegalArgumentException(
718742
"Unsupported output schema type " + schema);

langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/grpc_service.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,6 @@ def to_grpc_record(self, record: Record) -> Tuple[List[Schema], GrpcRecord]:
287287
def to_grpc_value(self, value) -> Tuple[Optional[Schema], Optional[Value]]:
288288
if value is None:
289289
return None, None
290-
# TODO: define a python type for Avro
291290
grpc_value = Value()
292291
grpc_schema = None
293292
if isinstance(value, bytes):

langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/pulsar/PulsarRunnerDockerTest.java

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
1919
import static org.junit.jupiter.api.Assertions.assertEquals;
20+
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
2021

2122
import ai.langstream.AbstractApplicationRunner;
2223
import ai.langstream.kafka.AbstractKafkaApplicationRunner;
@@ -34,6 +35,8 @@
3435
import org.apache.pulsar.client.api.PulsarClientException;
3536
import org.apache.pulsar.client.api.Schema;
3637
import org.apache.pulsar.client.api.schema.GenericRecord;
38+
import org.apache.pulsar.common.schema.KeyValue;
39+
import org.apache.pulsar.common.schema.KeyValueEncodingType;
3740
import org.awaitility.Awaitility;
3841
import org.junit.jupiter.api.Test;
3942
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -148,6 +151,66 @@ public void testTopicSchema() throws Exception {
148151
}
149152
}
150153

154+
@Test
155+
public void testKeyValueSchema() throws Exception {
156+
String tenant = "topic-schema";
157+
String[] expectedAgents = {"app-step1"};
158+
String inputTopic = "input-topic-" + UUID.randomUUID();
159+
String outputTopic = "output-topic-" + UUID.randomUUID();
160+
161+
Map<String, String> application =
162+
Map.of(
163+
"module.yaml",
164+
"""
165+
module: "module-1"
166+
id: "pipeline-1"
167+
topics:
168+
- name: "%s"
169+
creation-mode: create-if-not-exists
170+
schema:
171+
type: "string"
172+
keySchema:
173+
type: "string"
174+
- name: "%s"
175+
creation-mode: create-if-not-exists
176+
schema:
177+
type: "string"
178+
keySchema:
179+
type: "string"
180+
pipeline:
181+
- id: "step1"
182+
type: "identity"
183+
input: "%s"
184+
output: "%s"
185+
"""
186+
.formatted(inputTopic, outputTopic, inputTopic, outputTopic));
187+
188+
try (ApplicationRuntime applicationRuntime =
189+
deployApplication(
190+
tenant, "app", application, buildInstanceYaml(), expectedAgents)) {
191+
try (Producer<KeyValue<String, String>> producer =
192+
createProducer(
193+
inputTopic,
194+
Schema.KeyValue(
195+
Schema.STRING,
196+
Schema.STRING,
197+
KeyValueEncodingType.SEPARATED));
198+
Consumer<GenericRecord> consumer = createConsumer(outputTopic)) {
199+
200+
producer.newMessage().value(new KeyValue<>("key", "value")).send();
201+
producer.flush();
202+
203+
executeAgentRunners(applicationRuntime);
204+
205+
Message<GenericRecord> record = consumer.receive(30, TimeUnit.SECONDS);
206+
Object value = record.getValue().getNativeObject();
207+
assertInstanceOf(KeyValue.class, value);
208+
assertEquals("key", ((KeyValue<?, ?>) value).getKey());
209+
assertEquals("value", ((KeyValue<?, ?>) value).getValue());
210+
}
211+
}
212+
}
213+
151214
@Test
152215
public void testDeadLetter() throws Exception {
153216
String tenant = "tenant";
@@ -224,7 +287,12 @@ private String buildInstanceYaml() {
224287
}
225288

226289
protected Producer<String> createProducer(String topic) throws PulsarClientException {
227-
return pulsarContainer.getClient().newProducer(Schema.STRING).topic(topic).create();
290+
return createProducer(topic, Schema.STRING);
291+
}
292+
293+
protected <T> Producer<T> createProducer(String topic, Schema<T> schema)
294+
throws PulsarClientException {
295+
return pulsarContainer.getClient().newProducer(schema).topic(topic).create();
228296
}
229297

230298
protected Consumer<GenericRecord> createConsumer(String topic) throws PulsarClientException {

0 commit comments

Comments
 (0)