diff --git a/build.gradle.kts b/build.gradle.kts index c6f7a6d36..74268e8f3 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -11,7 +11,7 @@ allprojects { repositories { mavenCentral() maven(url = "https://packages.confluent.io/maven/") - maven(url = "https://s01.oss.sonatype.org/content/repositories/snapshots") + maven(url = "https://central.sonatype.com/repository/maven-snapshots") } } diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 2b3826a2b..4fbbec855 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -4,7 +4,7 @@ mockito = "5.17.0" testcontainers = "1.21.0" [libraries] -kafka-streams-utils = { group = "com.bakdata.kafka", name = "kafka-streams-utils", version = "1.1.1" } +kafka-streams-utils = { group = "com.bakdata.kafka", name = "kafka-streams-utils", version = "1.2.0" } kafka-tools = { group = "org.apache.kafka", name = "kafka-tools" } kafka-streams = { group = "org.apache.kafka", name = "kafka-streams" } kafka-clients = { group = "org.apache.kafka", name = "kafka-clients" } diff --git a/streams-bootstrap-cli-test/src/test/resources/log4j2.xml b/streams-bootstrap-cli-test/src/test/resources/log4j2.xml index a3089a43a..aed046ea1 100644 --- a/streams-bootstrap-cli-test/src/test/resources/log4j2.xml +++ b/streams-bootstrap-cli-test/src/test/resources/log4j2.xml @@ -27,6 +27,15 @@ + + + + + + + + + diff --git a/streams-bootstrap-cli/src/test/resources/log4j2.xml b/streams-bootstrap-cli/src/test/resources/log4j2.xml index 0d4071ce2..dfbe33f8d 100644 --- a/streams-bootstrap-cli/src/test/resources/log4j2.xml +++ b/streams-bootstrap-cli/src/test/resources/log4j2.xml @@ -30,5 +30,14 @@ + + + + + + + + + diff --git a/streams-bootstrap-core/build.gradle.kts b/streams-bootstrap-core/build.gradle.kts index a608ea643..4b7b1de85 100644 --- a/streams-bootstrap-core/build.gradle.kts +++ b/streams-bootstrap-core/build.gradle.kts @@ -7,9 +7,7 @@ plugins { dependencies { api(libs.kafka.streams.utils) - implementation(libs.kafka.tools) { - exclude(group = "org.slf4j", module = "slf4j-reload4j") - } + implementation(libs.kafka.tools) api(libs.kafka.streams) api(libs.kafka.clients) diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredProducerApp.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredProducerApp.java index 13c3d86ff..915f8bb07 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredProducerApp.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredProducerApp.java @@ -32,6 +32,7 @@ import lombok.NonNull; import lombok.RequiredArgsConstructor; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.record.CompressionType; /** * A {@link ProducerApp} with a corresponding {@link ProducerTopicConfig} @@ -50,7 +51,7 @@ private static Map createBaseConfig() { kafkaConfig.put(ProducerConfig.ACKS_CONFIG, "all"); // compression - kafkaConfig.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip"); + kafkaConfig.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, CompressionType.GZIP.toString()); return kafkaConfig; } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredStreamsApp.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredStreamsApp.java index 14a1dfa91..a139678d8 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredStreamsApp.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredStreamsApp.java @@ -31,6 +31,7 @@ import lombok.NonNull; import lombok.RequiredArgsConstructor; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; @@ -54,7 +55,8 @@ private static Map createBaseConfig() { kafkaConfig.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "all"); // compression - kafkaConfig.put(StreamsConfig.producerPrefix(ProducerConfig.COMPRESSION_TYPE_CONFIG), "gzip"); + kafkaConfig.put(StreamsConfig.producerPrefix(ProducerConfig.COMPRESSION_TYPE_CONFIG), + CompressionType.GZIP.toString()); return kafkaConfig; } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConsumedX.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConsumedX.java index e530ba3ed..02baaff91 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConsumedX.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConsumedX.java @@ -26,7 +26,8 @@ import java.util.function.Function; import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.streams.Topology.AutoOffsetReset; +import org.apache.kafka.streams.AutoOffsetReset; +import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.processor.TimestampExtractor; @@ -109,6 +110,14 @@ public static ConsumedX with(final TimestampExtractor timestampExtr return new ConsumedX<>(configurator -> Consumed.with(timestampExtractor)); } + /** + * @see Consumed#with(Topology.AutoOffsetReset) + */ + @Deprecated(since = "5.0.0") + public static ConsumedX with(final Topology.AutoOffsetReset resetPolicy) { + return new ConsumedX<>(configurator -> Consumed.with(resetPolicy)); + } + /** * @see Consumed#with(AutoOffsetReset) */ @@ -159,6 +168,14 @@ public ConsumedX withOffsetResetPolicy(final AutoOffsetReset offsetResetPo return this.modify(consumed -> consumed.withOffsetResetPolicy(offsetResetPolicy)); } + /** + * @see Consumed#withOffsetResetPolicy(Topology.AutoOffsetReset) + */ + @Deprecated(since = "5.0.0") + public ConsumedX withOffsetResetPolicy(final Topology.AutoOffsetReset offsetResetPolicy) { + return this.modify(consumed -> consumed.withOffsetResetPolicy(offsetResetPolicy)); + } + /** * @see Consumed#withTimestampExtractor(TimestampExtractor) */ diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KStreamX.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KStreamX.java index 86082a0f4..160cfc36e 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KStreamX.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KStreamX.java @@ -40,13 +40,10 @@ import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.Repartitioned; import org.apache.kafka.streams.kstream.StreamJoined; -import org.apache.kafka.streams.kstream.TransformerSupplier; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.ValueJoinerWithKey; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.kstream.ValueMapperWithKey; -import org.apache.kafka.streams.kstream.ValueTransformerSupplier; -import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier; import org.apache.kafka.streams.processor.TopicNameExtractor; import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier; import org.apache.kafka.streams.processor.api.ProcessorSupplier; @@ -479,14 +476,6 @@ KErrorStreamX flatMapValuesCapturingErrors( @Override KStreamX peek(ForeachAction action, Named named); - @Deprecated - @Override - KStreamX[] branch(Named named, Predicate... predicates); - - @Deprecated - @Override - KStreamX[] branch(Predicate... predicates); - @Override BranchedKStreamX split(); @@ -499,14 +488,6 @@ KErrorStreamX flatMapValuesCapturingErrors( @Override KStreamX merge(KStream stream, Named named); - @Deprecated - @Override - KStreamX through(String topic); - - @Deprecated - @Override - KStreamX through(String topic, Produced produced); - @Override KStreamX repartition(); @@ -833,78 +814,6 @@ KStreamX leftJoin(GlobalKTable globalTable, KeyValueMapper keySelector, ValueJoinerWithKey valueJoiner, Named named); - @Deprecated - @Override - KStreamX transform( - TransformerSupplier> transformerSupplier, - String... stateStoreNames); - - @Deprecated - @Override - KStreamX transform( - TransformerSupplier> transformerSupplier, - Named named, String... stateStoreNames); - - @Deprecated - @Override - KStreamX flatTransform( - TransformerSupplier>> transformerSupplier, - String... stateStoreNames); - - @Deprecated - @Override - KStreamX flatTransform( - TransformerSupplier>> transformerSupplier, Named named, - String... stateStoreNames); - - @Deprecated - @Override - KStreamX transformValues( - ValueTransformerSupplier valueTransformerSupplier, - String... stateStoreNames); - - @Deprecated - @Override - KStreamX transformValues( - ValueTransformerSupplier valueTransformerSupplier, - Named named, String... stateStoreNames); - - @Deprecated - @Override - KStreamX transformValues( - ValueTransformerWithKeySupplier valueTransformerSupplier, - String... stateStoreNames); - - @Deprecated - @Override - KStreamX transformValues( - ValueTransformerWithKeySupplier valueTransformerSupplier, Named named, - String... stateStoreNames); - - @Deprecated - @Override - KStreamX flatTransformValues( - ValueTransformerSupplier> valueTransformerSupplier, - String... stateStoreNames); - - @Deprecated - @Override - KStreamX flatTransformValues( - ValueTransformerSupplier> valueTransformerSupplier, - Named named, String... stateStoreNames); - - @Deprecated - @Override - KStreamX flatTransformValues( - ValueTransformerWithKeySupplier> valueTransformerSupplier, - String... stateStoreNames); - - @Deprecated - @Override - KStreamX flatTransformValues( - ValueTransformerWithKeySupplier> valueTransformerSupplier, Named named, - String... stateStoreNames); - @Override KStreamX process( ProcessorSupplier processorSupplier, diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KStreamXImpl.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KStreamXImpl.java index b815ff320..bf307c8a1 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KStreamXImpl.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KStreamXImpl.java @@ -24,7 +24,6 @@ package com.bakdata.kafka; -import java.util.Arrays; import lombok.AccessLevel; import lombok.Getter; import lombok.NonNull; @@ -46,13 +45,10 @@ import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.Repartitioned; import org.apache.kafka.streams.kstream.StreamJoined; -import org.apache.kafka.streams.kstream.TransformerSupplier; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.ValueJoinerWithKey; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.kstream.ValueMapperWithKey; -import org.apache.kafka.streams.kstream.ValueTransformerSupplier; -import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier; import org.apache.kafka.streams.processor.TopicNameExtractor; import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier; import org.apache.kafka.streams.processor.api.ProcessorSupplier; @@ -369,20 +365,6 @@ public KStreamX peek(final ForeachAction action, fin return this.context.wrap(this.wrapped.peek(action, named)); } - @Override - public KStreamX[] branch(final Predicate... predicates) { - return Arrays.stream(this.wrapped.branch(predicates)) - .map(this.context::wrap) - .toArray(KStreamX[]::new); - } - - @Override - public KStreamX[] branch(final Named named, final Predicate... predicates) { - return Arrays.stream(this.wrapped.branch(named, predicates)) - .map(this.context::wrap) - .toArray(KStreamX[]::new); - } - @Override public BranchedKStreamX split() { return this.context.wrap(this.wrapped.split()); @@ -405,16 +387,6 @@ public KStreamX merge(final KStream stream, final Named named) { return this.context.wrap(this.wrapped.merge(other, named)); } - @Override - public KStreamX through(final String topic) { - return this.context.wrap(this.wrapped.through(topic)); - } - - @Override - public KStreamX through(final String topic, final Produced produced) { - return this.context.wrap(this.wrapped.through(topic, produced)); - } - @Override public KStreamX repartition() { return this.context.wrap(this.wrapped.repartition()); @@ -844,109 +816,6 @@ public KStreamX leftJoin(final GlobalKTable globalTa return this.context.wrap(this.wrapped.leftJoin(globalTable, keySelector, valueJoiner, named)); } - @Override - public KStreamX transform( - final TransformerSupplier> transformerSupplier, - final String... stateStoreNames) { - return this.context.wrap(this.wrapped.transform(transformerSupplier, stateStoreNames)); - } - - @Override - public KStreamX transform( - final TransformerSupplier> transformerSupplier, final Named named, - final String... stateStoreNames) { - return this.context.wrap(this.wrapped.transform(transformerSupplier, named, stateStoreNames)); - } - - @Override - public KStreamX flatTransform( - final TransformerSupplier>> transformerSupplier, - final String... stateStoreNames) { - return this.context.wrap(this.wrapped.flatTransform(transformerSupplier, stateStoreNames)); - } - - @Override - public KStreamX flatTransform( - final TransformerSupplier>> transformerSupplier, - final Named named, - final String... stateStoreNames) { - return this.context.wrap(this.wrapped.flatTransform(transformerSupplier, named, stateStoreNames)); - } - - @Override - public KStreamX transformValues( - final ValueTransformerSupplier valueTransformerSupplier, - final String... stateStoreNames) { - return this.context.wrap(this.wrapped.transformValues(valueTransformerSupplier, stateStoreNames)); - } - - @Override - public KStreamX transformValues( - final ValueTransformerSupplier valueTransformerSupplier, final Named named, - final String... stateStoreNames) { - return this.context.wrap(this.wrapped.transformValues(valueTransformerSupplier, named, stateStoreNames)); - } - - @Override - public KStreamX transformValues( - final ValueTransformerWithKeySupplier valueTransformerSupplier, - final String... stateStoreNames) { - return this.context.wrap(this.wrapped.transformValues(valueTransformerSupplier, stateStoreNames)); - } - - @Override - public KStreamX transformValues( - final ValueTransformerWithKeySupplier valueTransformerSupplier, - final Named named, - final String... stateStoreNames) { - return this.context.wrap(this.wrapped.transformValues(valueTransformerSupplier, named, stateStoreNames)); - } - - @Override - public KStreamX flatTransformValues( - final ValueTransformerSupplier> valueTransformerSupplier, - final String... stateStoreNames) { - return this.context.wrap(this.wrapped.flatTransformValues(valueTransformerSupplier, stateStoreNames)); - } - - @Override - public KStreamX flatTransformValues( - final ValueTransformerSupplier> valueTransformerSupplier, final Named named, - final String... stateStoreNames) { - return this.context.wrap( - this.wrapped.flatTransformValues(valueTransformerSupplier, named, stateStoreNames)); - } - - @Override - public KStreamX flatTransformValues( - final ValueTransformerWithKeySupplier> valueTransformerSupplier, - final String... stateStoreNames) { - return this.context.wrap(this.wrapped.flatTransformValues(valueTransformerSupplier, stateStoreNames)); - } - - @Override - public KStreamX flatTransformValues( - final ValueTransformerWithKeySupplier> valueTransformerSupplier, - final Named named, - final String... stateStoreNames) { - return this.context.wrap( - this.wrapped.flatTransformValues(valueTransformerSupplier, named, stateStoreNames)); - } - - @Override - public void process( - final org.apache.kafka.streams.processor.ProcessorSupplier processorSupplier, - final String... stateStoreNames) { - this.wrapped.process(processorSupplier, stateStoreNames); - } - - @Override - public void process( - final org.apache.kafka.streams.processor.ProcessorSupplier processorSupplier, - final Named named, final String... stateStoreNames) { - this.wrapped.process(processorSupplier, named, stateStoreNames); - } - @Override public KStreamX process( final ProcessorSupplier processorSupplier, diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KTableX.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KTableX.java index ac6377d60..3cbe9ec06 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KTableX.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KTableX.java @@ -24,6 +24,7 @@ package com.bakdata.kafka; +import java.util.function.BiFunction; import java.util.function.Function; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; @@ -313,11 +314,6 @@ KTableX outerJoin(KTable other, KTableX join(KTable other, Function foreignKeyExtractor, ValueJoiner joiner); - @Deprecated - @Override - KTableX join(KTable other, Function foreignKeyExtractor, - ValueJoiner joiner, Named named); - @Override KTableX join(KTable other, Function foreignKeyExtractor, ValueJoiner joiner, TableJoined tableJoined); @@ -332,11 +328,6 @@ KTableX join(KTable other, Function foreignKe KTableX join(KTable other, Function foreignKeyExtractor, ValueJoiner joiner, MaterializedX> materialized); - @Deprecated - @Override - KTableX join(KTable other, Function foreignKeyExtractor, - ValueJoiner joiner, Named named, Materialized> materialized); - @Override KTableX join(KTable other, Function foreignKeyExtractor, ValueJoiner joiner, TableJoined tableJoined, @@ -353,11 +344,6 @@ KTableX join(KTable other, Function foreignKe KTableX leftJoin(KTable other, Function foreignKeyExtractor, ValueJoiner joiner); - @Deprecated - @Override - KTableX leftJoin(KTable other, Function foreignKeyExtractor, - ValueJoiner joiner, Named named); - @Override KTableX leftJoin(KTable other, Function foreignKeyExtractor, ValueJoiner joiner, TableJoined tableJoined); @@ -372,11 +358,6 @@ KTableX leftJoin(KTable other, Function forei KTableX leftJoin(KTable other, Function foreignKeyExtractor, ValueJoiner joiner, MaterializedX> materialized); - @Deprecated - @Override - KTableX leftJoin(KTable other, Function foreignKeyExtractor, - ValueJoiner joiner, Named named, Materialized> materialized); - @Override KTableX leftJoin(KTable other, Function foreignKeyExtractor, ValueJoiner joiner, TableJoined tableJoined, @@ -388,4 +369,64 @@ KTableX leftJoin(KTable other, Function forei KTableX leftJoin(KTable other, Function foreignKeyExtractor, ValueJoiner joiner, TableJoined tableJoined, MaterializedX> materialized); + + @Override + KTableX join(KTable other, BiFunction foreignKeyExtractor, + ValueJoiner joiner); + + @Override + KTableX join(KTable other, BiFunction foreignKeyExtractor, + ValueJoiner joiner, TableJoined tableJoined); + + @Override + KTableX join(KTable other, BiFunction foreignKeyExtractor, + ValueJoiner joiner, Materialized> materialized); + + /** + * @see #join(KTable, BiFunction, ValueJoiner, Materialized) + */ + KTableX join(KTable other, BiFunction foreignKeyExtractor, + ValueJoiner joiner, MaterializedX> materialized); + + @Override + KTableX join(KTable other, BiFunction foreignKeyExtractor, + ValueJoiner joiner, TableJoined tableJoined, + Materialized> materialized); + + /** + * @see #join(KTable, BiFunction, ValueJoiner, TableJoined, Materialized) + */ + KTableX join(KTable other, BiFunction foreignKeyExtractor, + ValueJoiner joiner, TableJoined tableJoined, + MaterializedX> materialized); + + @Override + KTableX leftJoin(KTable other, BiFunction foreignKeyExtractor, + ValueJoiner joiner); + + @Override + KTableX leftJoin(KTable other, BiFunction foreignKeyExtractor, + ValueJoiner joiner, TableJoined tableJoined); + + @Override + KTableX leftJoin(KTable other, BiFunction foreignKeyExtractor, + ValueJoiner joiner, Materialized> materialized); + + /** + * @see #leftJoin(KTable, BiFunction, ValueJoiner, Materialized) + */ + KTableX leftJoin(KTable other, BiFunction foreignKeyExtractor, + ValueJoiner joiner, MaterializedX> materialized); + + @Override + KTableX leftJoin(KTable other, BiFunction foreignKeyExtractor, + ValueJoiner joiner, TableJoined tableJoined, + Materialized> materialized); + + /** + * @see #leftJoin(KTable, BiFunction, ValueJoiner, TableJoined, Materialized) + */ + KTableX leftJoin(KTable other, BiFunction foreignKeyExtractor, + ValueJoiner joiner, TableJoined tableJoined, + MaterializedX> materialized); } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KTableXImpl.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KTableXImpl.java index 73ac9744b..e7ca61e4c 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KTableXImpl.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KTableXImpl.java @@ -24,6 +24,7 @@ package com.bakdata.kafka; +import java.util.function.BiFunction; import java.util.function.Function; import lombok.AccessLevel; import lombok.Getter; @@ -422,14 +423,6 @@ public KTableX join(final KTable other, return this.context.wrap(this.wrapped.join(otherTable, foreignKeyExtractor, joiner)); } - @Override - public KTableX join(final KTable other, - final Function foreignKeyExtractor, - final ValueJoiner joiner, final Named named) { - final KTable otherTable = StreamsContext.maybeUnwrap(other); - return this.context.wrap(this.wrapped.join(otherTable, foreignKeyExtractor, joiner, named)); - } - @Override public KTableX join(final KTable other, final Function foreignKeyExtractor, @@ -454,15 +447,6 @@ public KTableX join(final KTable other, return this.join(other, foreignKeyExtractor, joiner, materialized.configure(this.context.getConfigurator())); } - @Override - public KTableX join(final KTable other, - final Function foreignKeyExtractor, - final ValueJoiner joiner, final Named named, - final Materialized> materialized) { - final KTable otherTable = StreamsContext.maybeUnwrap(other); - return this.context.wrap(this.wrapped.join(otherTable, foreignKeyExtractor, joiner, named, materialized)); - } - @Override public KTableX join(final KTable other, final Function foreignKeyExtractor, @@ -489,14 +473,6 @@ public KTableX leftJoin(final KTable other, return this.context.wrap(this.wrapped.leftJoin(otherTable, foreignKeyExtractor, joiner)); } - @Override - public KTableX leftJoin(final KTable other, - final Function foreignKeyExtractor, - final ValueJoiner joiner, final Named named) { - final KTable otherTable = StreamsContext.maybeUnwrap(other); - return this.context.wrap(this.wrapped.leftJoin(otherTable, foreignKeyExtractor, joiner, named)); - } - @Override public KTableX leftJoin(final KTable other, final Function foreignKeyExtractor, @@ -525,16 +501,103 @@ public KTableX leftJoin(final KTable other, @Override public KTableX leftJoin(final KTable other, final Function foreignKeyExtractor, - final ValueJoiner joiner, final Named named, + final ValueJoiner joiner, final TableJoined tableJoined, final Materialized> materialized) { final KTable otherTable = StreamsContext.maybeUnwrap(other); - return this.context.wrap(this.wrapped.leftJoin(otherTable, foreignKeyExtractor, joiner, named, materialized)); + return this.context.wrap( + this.wrapped.leftJoin(otherTable, foreignKeyExtractor, joiner, tableJoined, materialized)); } @Override public KTableX leftJoin(final KTable other, final Function foreignKeyExtractor, final ValueJoiner joiner, final TableJoined tableJoined, + final MaterializedX> materialized) { + return this.leftJoin(other, foreignKeyExtractor, joiner, tableJoined, + materialized.configure(this.context.getConfigurator())); + } + + @Override + public KTableX join(final KTable other, final BiFunction foreignKeyExtractor, + final ValueJoiner joiner) { + final KTable otherTable = StreamsContext.maybeUnwrap(other); + return this.context.wrap(this.wrapped.join(otherTable, foreignKeyExtractor, joiner)); + } + + @Override + public KTableX join(final KTable other, final BiFunction foreignKeyExtractor, + final ValueJoiner joiner, final TableJoined tableJoined) { + final KTable otherTable = StreamsContext.maybeUnwrap(other); + return this.context.wrap(this.wrapped.join(otherTable, foreignKeyExtractor, joiner, tableJoined)); + } + + @Override + public KTableX join(final KTable other, final BiFunction foreignKeyExtractor, + final ValueJoiner joiner, final Materialized> materialized) { + final KTable otherTable = StreamsContext.maybeUnwrap(other); + return this.context.wrap(this.wrapped.join(otherTable, foreignKeyExtractor, joiner, materialized)); + } + + @Override + public KTableX join(final KTable other, final BiFunction foreignKeyExtractor, + final ValueJoiner joiner, + final MaterializedX> materialized) { + return this.join(other, foreignKeyExtractor, joiner, materialized.configure(this.context.getConfigurator())); + } + + @Override + public KTableX join(final KTable other, final BiFunction foreignKeyExtractor, + final ValueJoiner joiner, final TableJoined tableJoined, + final Materialized> materialized) { + final KTable otherTable = StreamsContext.maybeUnwrap(other); + return this.context.wrap(this.wrapped.join(otherTable, foreignKeyExtractor, joiner, tableJoined, materialized)); + } + + @Override + public KTableX join(final KTable other, final BiFunction foreignKeyExtractor, + final ValueJoiner joiner, final TableJoined tableJoined, + final MaterializedX> materialized) { + return this.join(other, foreignKeyExtractor, joiner, tableJoined, + materialized.configure(this.context.getConfigurator())); + } + + @Override + public KTableX leftJoin(final KTable other, + final BiFunction foreignKeyExtractor, + final ValueJoiner joiner) { + final KTable otherTable = StreamsContext.maybeUnwrap(other); + return this.context.wrap(this.wrapped.leftJoin(otherTable, foreignKeyExtractor, joiner)); + } + + @Override + public KTableX leftJoin(final KTable other, + final BiFunction foreignKeyExtractor, + final ValueJoiner joiner, final TableJoined tableJoined) { + final KTable otherTable = StreamsContext.maybeUnwrap(other); + return this.context.wrap(this.wrapped.leftJoin(otherTable, foreignKeyExtractor, joiner, tableJoined)); + } + + @Override + public KTableX leftJoin(final KTable other, + final BiFunction foreignKeyExtractor, + final ValueJoiner joiner, final Materialized> materialized) { + final KTable otherTable = StreamsContext.maybeUnwrap(other); + return this.context.wrap(this.wrapped.leftJoin(otherTable, foreignKeyExtractor, joiner, materialized)); + } + + @Override + public KTableX leftJoin(final KTable other, + final BiFunction foreignKeyExtractor, + final ValueJoiner joiner, + final MaterializedX> materialized) { + return this.leftJoin(other, foreignKeyExtractor, joiner, + materialized.configure(this.context.getConfigurator())); + } + + @Override + public KTableX leftJoin(final KTable other, + final BiFunction foreignKeyExtractor, + final ValueJoiner joiner, final TableJoined tableJoined, final Materialized> materialized) { final KTable otherTable = StreamsContext.maybeUnwrap(other); return this.context.wrap( @@ -543,7 +606,7 @@ public KTableX leftJoin(final KTable other, @Override public KTableX leftJoin(final KTable other, - final Function foreignKeyExtractor, + final BiFunction foreignKeyExtractor, final ValueJoiner joiner, final TableJoined tableJoined, final MaterializedX> materialized) { return this.leftJoin(other, foreignKeyExtractor, joiner, tableJoined, diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsCleanUpRunner.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsCleanUpRunner.java index 97116e66c..43fd380c6 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsCleanUpRunner.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/StreamsCleanUpRunner.java @@ -91,11 +91,10 @@ public static StreamsCleanUpRunner create(final @NonNull Topology topology, * Streams Reset Tool * * @param inputTopics list of input topics of the streams app - * @param intermediateTopics list of intermediate topics of the streams app * @param allTopics list of all topics that exists in the Kafka cluster * @param streamsAppConfig configuration properties of the streams app */ - public static void runResetter(final Collection inputTopics, final Collection intermediateTopics, + public static void runResetter(final Collection inputTopics, final Collection allTopics, final ImprovedStreamsConfig streamsAppConfig) { // StreamsResetter's internal AdminClient can only be configured with a properties file final String appId = streamsAppConfig.getAppId(); @@ -109,10 +108,6 @@ public static void runResetter(final Collection inputTopics, final Colle if (!existingInputTopics.isEmpty()) { argList.addAll(List.of("--input-topics", String.join(",", existingInputTopics))); } - final Collection existingIntermediateTopics = filterExistingTopics(intermediateTopics, allTopics); - if (!existingIntermediateTopics.isEmpty()) { - argList.addAll(List.of("--intermediate-topics", String.join(",", existingIntermediateTopics))); - } final String[] args = argList.toArray(String[]::new); final StreamsResetter resetter = new StreamsResetter(); final int returnCode = resetter.execute(args); @@ -203,14 +198,17 @@ private class Task { private void reset() { final Collection allTopics = this.adminClient.getTopicClient().listTopics(); + this.reset(allTopics); + } + + private void reset(final Collection allTopics) { final List inputTopics = StreamsCleanUpRunner.this.topologyInformation.getInputTopics(allTopics); - final List intermediateTopics = - StreamsCleanUpRunner.this.topologyInformation.getIntermediateTopics(allTopics); - runResetter(inputTopics, intermediateTopics, allTopics, StreamsCleanUpRunner.this.config); + runResetter(inputTopics, allTopics, StreamsCleanUpRunner.this.config); // the StreamsResetter is responsible for deleting internal topics StreamsCleanUpRunner.this.topologyInformation.getInternalTopics() .forEach(this::resetInternalTopic); + this.deleteIntermediateTopics(allTopics); try (final KafkaStreams kafkaStreams = this.createStreams()) { kafkaStreams.cleanUp(); } @@ -223,22 +221,26 @@ private KafkaStreams createStreams() { } private void cleanAndReset() { - this.reset(); - this.clean(); + final Collection allTopics = this.adminClient.getTopicClient().listTopics(); + this.reset(allTopics); + this.clean(allTopics); } - private void clean() { - this.deleteTopics(); + private void clean(final Collection allTopics) { + this.deleteOutputTopics(allTopics); this.deleteConsumerGroup(); StreamsCleanUpRunner.this.cleanHooks.runCleanHooks(); } - /** - * Delete output topics - */ - private void deleteTopics() { - final List externalTopics = StreamsCleanUpRunner.this.topologyInformation.getExternalSinkTopics(); - externalTopics.forEach(this::deleteTopic); + private void deleteIntermediateTopics(final Collection allTopics) { + final List intermediateTopics = + StreamsCleanUpRunner.this.topologyInformation.getIntermediateTopics(allTopics); + intermediateTopics.forEach(this::deleteTopic); + } + + private void deleteOutputTopics(final Collection allTopics) { + final List outputTopics = StreamsCleanUpRunner.this.topologyInformation.getOutputTopics(allTopics); + outputTopics.forEach(this::deleteTopic); } private void resetInternalTopic(final String topic) { diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ConsumedXTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ConsumedXTest.java index 3658c8d5b..b0e713ce8 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ConsumedXTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ConsumedXTest.java @@ -38,7 +38,8 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.Topology.AutoOffsetReset; +import org.apache.kafka.streams.AutoOffsetReset; +import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyDescription.Node; import org.assertj.core.api.SoftAssertions; import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; @@ -273,7 +274,53 @@ void shouldUseOffsetResetPolicy() { @Override public void buildTopology(final StreamsBuilderX builder) { final KStreamX input = - builder.stream("input", ConsumedX.with(AutoOffsetReset.LATEST)); + builder.stream("input", ConsumedX.with(AutoOffsetReset.latest())); + input.to("output"); + } + }; + try (final KafkaContainer kafkaCluster = KafkaTest.newCluster()) { + kafkaCluster.start(); + final RuntimeConfiguration configuration = RuntimeConfiguration.create(kafkaCluster.getBootstrapServers()) + .withNoStateStoreCaching() + .withSessionTimeout(SESSION_TIMEOUT); + final KafkaTestClient testClient = new KafkaTestClient(configuration); + testClient.createTopic("input"); + testClient.createTopic("output"); + testClient.send() + .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) + .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) + .to("input", List.of(new SimpleProducerRecord<>("foo", "bar"))); + try (final ConfiguredStreamsApp configuredApp = app.configureApp(); + final ExecutableStreamsApp executableApp = configuredApp + .withRuntimeConfiguration(configuration); + final StreamsRunner runner = executableApp.createRunner()) { + runAsync(runner); + KafkaTest.awaitActive(executableApp); + testClient.send() + .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) + .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) + .to("input", List.of(new SimpleProducerRecord<>("baz", "qux"))); + KafkaTest.awaitProcessing(executableApp); + this.softly.assertThat(testClient.read() + .with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) + .with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) + .from("output", POLL_TIMEOUT)) + .hasSize(1) + .anySatisfy(outputRecord -> { + this.softly.assertThat(outputRecord.key()).isEqualTo("baz"); + this.softly.assertThat(outputRecord.value()).isEqualTo("qux"); + }); + } + } + } + + @Test + void shouldUseLegacyOffsetResetPolicy() { + final StringApp app = new StringApp() { + @Override + public void buildTopology(final StreamsBuilderX builder) { + final KStreamX input = + builder.stream("input", ConsumedX.with(Topology.AutoOffsetReset.LATEST)); input.to("output"); } }; @@ -319,7 +366,53 @@ void shouldUseOffsetResetPolicyModifier() { @Override public void buildTopology(final StreamsBuilderX builder) { final KStreamX input = builder.stream("input", - ConsumedX.as("stream").withOffsetResetPolicy(AutoOffsetReset.LATEST)); + ConsumedX.as("stream").withOffsetResetPolicy(AutoOffsetReset.latest())); + input.to("output"); + } + }; + try (final KafkaContainer kafkaCluster = KafkaTest.newCluster()) { + kafkaCluster.start(); + final RuntimeConfiguration configuration = RuntimeConfiguration.create(kafkaCluster.getBootstrapServers()) + .withNoStateStoreCaching() + .withSessionTimeout(SESSION_TIMEOUT); + final KafkaTestClient testClient = new KafkaTestClient(configuration); + testClient.createTopic("input"); + testClient.createTopic("output"); + testClient.send() + .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) + .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) + .to("input", List.of(new SimpleProducerRecord<>("foo", "bar"))); + try (final ConfiguredStreamsApp configuredApp = app.configureApp(); + final ExecutableStreamsApp executableApp = configuredApp + .withRuntimeConfiguration(configuration); + final StreamsRunner runner = executableApp.createRunner()) { + runAsync(runner); + KafkaTest.awaitActive(executableApp); + testClient.send() + .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) + .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) + .to("input", List.of(new SimpleProducerRecord<>("baz", "qux"))); + KafkaTest.awaitProcessing(executableApp); + this.softly.assertThat(testClient.read() + .with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) + .with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) + .from("output", POLL_TIMEOUT)) + .hasSize(1) + .anySatisfy(outputRecord -> { + this.softly.assertThat(outputRecord.key()).isEqualTo("baz"); + this.softly.assertThat(outputRecord.value()).isEqualTo("qux"); + }); + } + } + } + + @Test + void shouldUseLegacyOffsetResetPolicyModifier() { + final StringApp app = new StringApp() { + @Override + public void buildTopology(final StreamsBuilderX builder) { + final KStreamX input = builder.stream("input", + ConsumedX.as("stream").withOffsetResetPolicy(Topology.AutoOffsetReset.LATEST)); input.to("output"); } }; diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/KStreamXTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/KStreamXTest.java index 7c1392820..56923f77e 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/KStreamXTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/KStreamXTest.java @@ -24,7 +24,6 @@ package com.bakdata.kafka; -import static java.util.Collections.emptyList; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -49,12 +48,8 @@ import org.apache.kafka.streams.kstream.Named; import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.kstream.Printed; -import org.apache.kafka.streams.kstream.Produced; -import org.apache.kafka.streams.kstream.TransformerSupplier; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.kstream.ValueMapperWithKey; -import org.apache.kafka.streams.kstream.ValueTransformerSupplier; -import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier; import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier; import org.apache.kafka.streams.processor.api.FixedKeyRecord; import org.apache.kafka.streams.processor.api.ProcessorSupplier; @@ -3103,48 +3098,6 @@ public void buildTopology(final StreamsBuilderX builder) { } } - @Test - void shouldBranch() { - final StringApp app = new StringApp() { - @Override - public void buildTopology(final StreamsBuilderX builder) { - final KStreamX input = builder.stream("input"); - final KStreamX[] branches = input.branch((k, v) -> true); - branches[0].to("output"); - } - }; - try (final TestTopology topology = app.startApp()) { - topology.input() - .add("foo", "bar"); - topology.streamOutput() - .expectNextRecord() - .hasKey("foo") - .hasValue("bar") - .expectNoMoreRecord(); - } - } - - @Test - void shouldBranchNamed() { - final StringApp app = new StringApp() { - @Override - public void buildTopology(final StreamsBuilderX builder) { - final KStreamX input = builder.stream("input"); - final KStreamX[] branches = input.branch(Named.as("branch"), (k, v) -> true); - branches[0].to("output"); - } - }; - try (final TestTopology topology = app.startApp()) { - topology.input() - .add("foo", "bar"); - topology.streamOutput() - .expectNextRecord() - .hasKey("foo") - .hasValue("bar") - .expectNoMoreRecord(); - } - } - @Test void shouldTableJoin() { final StringApp app = new StringApp() { @@ -3639,863 +3592,4 @@ public void buildTopology(final StreamsBuilderX builder) { } } - @Test - void shouldRouteThrough() { - final StringApp app = new StringApp() { - @Override - public void buildTopology(final StreamsBuilderX builder) { - final KStreamX input = builder.stream("input"); - final KStreamX through = input.through("intermediate"); - through.to("output"); - } - }; - try (final TestTopology topology = app.startApp()) { - topology.input("input") - .add("foo", "bar"); - topology.streamOutput("intermediate") - .expectNextRecord() - .hasKey("foo") - .hasValue("bar") - .expectNoMoreRecord(); - topology.streamOutput("output") - .expectNextRecord() - .hasKey("foo") - .hasValue("bar") - .expectNoMoreRecord(); - } - } - - @Test - void shouldRouteThroughUsingProduced() { - final DoubleApp app = new DoubleApp() { - @Override - public void buildTopology(final StreamsBuilderX builder) { - final KStreamX input = - builder.stream("input", ConsumedX.with(Serdes.String(), Serdes.String())); - final KStreamX through = - input.through("intermediate", Produced.with(Serdes.String(), Serdes.String())); - through.to("output", ProducedX.with(Serdes.String(), Serdes.String())); - } - }; - try (final TestTopology topology = app.startApp()) { - topology.input("input") - .withKeySerde(Serdes.String()) - .withValueSerde(Serdes.String()) - .add("foo", "bar"); - topology.streamOutput("intermediate") - .withKeySerde(Serdes.String()) - .withValueSerde(Serdes.String()) - .expectNextRecord() - .hasKey("foo") - .hasValue("bar") - .expectNoMoreRecord(); - topology.streamOutput("output") - .withKeySerde(Serdes.String()) - .withValueSerde(Serdes.String()) - .expectNextRecord() - .hasKey("foo") - .hasValue("bar") - .expectNoMoreRecord(); - } - } - - @Test - void shouldTransform() { - final TransformerSupplier> transformer = - () -> new SimpleTransformer<>() { - - @Override - public KeyValue transform(final String key, final String value) { - if ("foo".equals(key) && "bar".equals(value)) { - return KeyValue.pair("baz", "qux"); - } - throw new UnsupportedOperationException(); - } - }; - final StringApp app = new StringApp() { - @Override - public void buildTopology(final StreamsBuilderX builder) { - final KStreamX input = builder.stream("input"); - final KStreamX transformed = input.transform(transformer); - transformed.to("output"); - } - }; - try (final TestTopology topology = app.startApp()) { - topology.input().add("foo", "bar"); - topology.streamOutput() - .expectNextRecord() - .hasKey("baz") - .hasValue("qux") - .expectNoMoreRecord(); - } - } - - @Test - void shouldTransformNamed() { - final TransformerSupplier> transformer = - () -> new SimpleTransformer<>() { - - @Override - public KeyValue transform(final String key, final String value) { - if ("foo".equals(key) && "bar".equals(value)) { - return KeyValue.pair("baz", "qux"); - } - throw new UnsupportedOperationException(); - } - }; - final StringApp app = new StringApp() { - @Override - public void buildTopology(final StreamsBuilderX builder) { - final KStreamX input = builder.stream("input"); - final KStreamX transformed = input.transform(transformer, Named.as("transform")); - transformed.to("output"); - } - }; - try (final TestTopology topology = app.startApp()) { - topology.input().add("foo", "bar"); - topology.streamOutput() - .expectNextRecord() - .hasKey("baz") - .hasValue("qux") - .expectNoMoreRecord(); - } - } - - @Test - void shouldTransformUsingStore() { - final TransformerSupplier> transformer = - () -> new SimpleTransformer<>() { - - @Override - public KeyValue transform(final String key, final String value) { - final KeyValueStore store = this.getStateStore("my-store"); - store.put(key, value); - return null; - } - }; - final StringApp app = new StringApp() { - @Override - public void buildTopology(final StreamsBuilderX builder) { - final StoreBuilder> store = builder.stores() - .keyValueStoreBuilder(Stores.inMemoryKeyValueStore("my-store"), Preconfigured.defaultSerde(), - Preconfigured.defaultSerde()); - builder.addStateStore(store); - final KStreamX input = builder.stream("input"); - input.transform(transformer, "my-store"); - } - }; - try (final TestTopology topology = app.startApp()) { - topology.input().add("foo", "bar"); - final KeyValueStore store = - topology.getTestDriver().getKeyValueStore("my-store"); - this.softly.assertThat(store.get("foo")).isEqualTo("bar"); - } - } - - @Test - void shouldTransformNamedUsingStore() { - final TransformerSupplier> transformer = - () -> new SimpleTransformer<>() { - - @Override - public KeyValue transform(final String key, final String value) { - final KeyValueStore store = this.getStateStore("my-store"); - store.put(key, value); - return null; - } - }; - final StringApp app = new StringApp() { - @Override - public void buildTopology(final StreamsBuilderX builder) { - final StoreBuilder> store = builder.stores() - .keyValueStoreBuilder(Stores.inMemoryKeyValueStore("my-store"), Preconfigured.defaultSerde(), - Preconfigured.defaultSerde()); - builder.addStateStore(store); - final KStreamX input = builder.stream("input"); - input.transform(transformer, Named.as("transform"), "my-store"); - } - }; - try (final TestTopology topology = app.startApp()) { - topology.input().add("foo", "bar"); - final KeyValueStore store = - topology.getTestDriver().getKeyValueStore("my-store"); - this.softly.assertThat(store.get("foo")).isEqualTo("bar"); - } - } - - @Test - void shouldFlatTransform() { - final TransformerSupplier>> transformer = - () -> new SimpleTransformer<>() { - - @Override - public Iterable> transform(final String key, final String value) { - if ("foo".equals(key) && "bar".equals(value)) { - return List.of(KeyValue.pair("baz", "qux")); - } - throw new UnsupportedOperationException(); - } - }; - final StringApp app = new StringApp() { - @Override - public void buildTopology(final StreamsBuilderX builder) { - final KStreamX input = builder.stream("input"); - final KStreamX transformed = input.flatTransform(transformer); - transformed.to("output"); - } - }; - try (final TestTopology topology = app.startApp()) { - topology.input().add("foo", "bar"); - topology.streamOutput() - .expectNextRecord() - .hasKey("baz") - .hasValue("qux") - .expectNoMoreRecord(); - } - } - - @Test - void shouldFlatTransformNamed() { - final TransformerSupplier>> transformer = - () -> new SimpleTransformer<>() { - - @Override - public Iterable> transform(final String key, final String value) { - if ("foo".equals(key) && "bar".equals(value)) { - return List.of(KeyValue.pair("baz", "qux")); - } - throw new UnsupportedOperationException(); - } - }; - final StringApp app = new StringApp() { - @Override - public void buildTopology(final StreamsBuilderX builder) { - final KStreamX input = builder.stream("input"); - final KStreamX transformed = - input.flatTransform(transformer, Named.as("flatTransform")); - transformed.to("output"); - } - }; - try (final TestTopology topology = app.startApp()) { - topology.input().add("foo", "bar"); - topology.streamOutput() - .expectNextRecord() - .hasKey("baz") - .hasValue("qux") - .expectNoMoreRecord(); - } - } - - @Test - void shouldFlatTransformUsingStore() { - final TransformerSupplier>> transformer = - () -> new SimpleTransformer<>() { - - @Override - public Iterable> transform(final String key, final String value) { - final KeyValueStore store = this.getStateStore("my-store"); - store.put(key, value); - return null; - } - }; - final StringApp app = new StringApp() { - @Override - public void buildTopology(final StreamsBuilderX builder) { - final StoreBuilder> store = builder.stores() - .keyValueStoreBuilder(Stores.inMemoryKeyValueStore("my-store"), Preconfigured.defaultSerde(), - Preconfigured.defaultSerde()); - builder.addStateStore(store); - final KStreamX input = builder.stream("input"); - input.flatTransform(transformer, "my-store"); - } - }; - try (final TestTopology topology = app.startApp()) { - topology.input().add("foo", "bar"); - final KeyValueStore store = - topology.getTestDriver().getKeyValueStore("my-store"); - this.softly.assertThat(store.get("foo")).isEqualTo("bar"); - } - } - - @Test - void shouldFlatTransformNamedUsingStore() { - final TransformerSupplier>> transformer = - () -> new SimpleTransformer<>() { - - @Override - public Iterable> transform(final String key, final String value) { - final KeyValueStore store = this.getStateStore("my-store"); - store.put(key, value); - return null; - } - }; - final StringApp app = new StringApp() { - @Override - public void buildTopology(final StreamsBuilderX builder) { - final StoreBuilder> store = builder.stores() - .keyValueStoreBuilder(Stores.inMemoryKeyValueStore("my-store"), Preconfigured.defaultSerde(), - Preconfigured.defaultSerde()); - builder.addStateStore(store); - final KStreamX input = builder.stream("input"); - input.flatTransform(transformer, Named.as("flatTransform"), "my-store"); - } - }; - try (final TestTopology topology = app.startApp()) { - topology.input().add("foo", "bar"); - final KeyValueStore store = - topology.getTestDriver().getKeyValueStore("my-store"); - this.softly.assertThat(store.get("foo")).isEqualTo("bar"); - } - } - - @Test - void shouldTransformValues() { - final ValueTransformerSupplier transformer = () -> new SimpleValueTransformer<>() { - - @Override - public String transform(final String value) { - if ("bar".equals(value)) { - return "baz"; - } - throw new UnsupportedOperationException(); - } - }; - final StringApp app = new StringApp() { - @Override - public void buildTopology(final StreamsBuilderX builder) { - final KStreamX input = builder.stream("input"); - final KStreamX transformed = input.transformValues(transformer); - transformed.to("output"); - } - }; - try (final TestTopology topology = app.startApp()) { - topology.input().add("foo", "bar"); - topology.streamOutput() - .expectNextRecord() - .hasKey("foo") - .hasValue("baz") - .expectNoMoreRecord(); - } - } - - @Test - void shouldTransformValuesNamed() { - final ValueTransformerSupplier transformer = () -> new SimpleValueTransformer<>() { - - @Override - public String transform(final String value) { - if ("bar".equals(value)) { - return "baz"; - } - throw new UnsupportedOperationException(); - } - }; - final StringApp app = new StringApp() { - @Override - public void buildTopology(final StreamsBuilderX builder) { - final KStreamX input = builder.stream("input"); - final KStreamX transformed = input.transformValues(transformer, Named.as("transform")); - transformed.to("output"); - } - }; - try (final TestTopology topology = app.startApp()) { - topology.input().add("foo", "bar"); - topology.streamOutput() - .expectNextRecord() - .hasKey("foo") - .hasValue("baz") - .expectNoMoreRecord(); - } - } - - @Test - void shouldTransformValuesUsingStore() { - final ValueTransformerSupplier transformer = () -> new SimpleValueTransformer<>() { - - @Override - public String transform(final String value) { - final KeyValueStore store = this.getStateStore("my-store"); - store.put(value, value); - return null; - } - }; - final StringApp app = new StringApp() { - @Override - public void buildTopology(final StreamsBuilderX builder) { - final StoreBuilder> store = builder.stores() - .keyValueStoreBuilder(Stores.inMemoryKeyValueStore("my-store"), Preconfigured.defaultSerde(), - Preconfigured.defaultSerde()); - builder.addStateStore(store); - final KStreamX input = builder.stream("input"); - input.transformValues(transformer, "my-store"); - } - }; - try (final TestTopology topology = app.startApp()) { - topology.input().add("foo", "bar"); - final KeyValueStore store = - topology.getTestDriver().getKeyValueStore("my-store"); - this.softly.assertThat(store.get("bar")).isEqualTo("bar"); - } - } - - @Test - void shouldTransformValuesNamedUsingStore() { - final ValueTransformerSupplier transformer = () -> new SimpleValueTransformer<>() { - - @Override - public String transform(final String value) { - final KeyValueStore store = this.getStateStore("my-store"); - store.put(value, value); - return null; - } - }; - final StringApp app = new StringApp() { - @Override - public void buildTopology(final StreamsBuilderX builder) { - final StoreBuilder> store = builder.stores() - .keyValueStoreBuilder(Stores.inMemoryKeyValueStore("my-store"), Preconfigured.defaultSerde(), - Preconfigured.defaultSerde()); - builder.addStateStore(store); - final KStreamX input = builder.stream("input"); - input.transformValues(transformer, Named.as("transform"), "my-store"); - } - }; - try (final TestTopology topology = app.startApp()) { - topology.input().add("foo", "bar"); - final KeyValueStore store = - topology.getTestDriver().getKeyValueStore("my-store"); - this.softly.assertThat(store.get("bar")).isEqualTo("bar"); - } - } - - @Test - void shouldFlatTransformValues() { - final ValueTransformerSupplier> transformer = () -> new SimpleValueTransformer<>() { - - @Override - public Iterable transform(final String value) { - if ("bar".equals(value)) { - return List.of("baz"); - } - throw new UnsupportedOperationException(); - } - }; - final StringApp app = new StringApp() { - @Override - public void buildTopology(final StreamsBuilderX builder) { - final KStreamX input = builder.stream("input"); - final KStreamX transformed = input.flatTransformValues(transformer); - transformed.to("output"); - } - }; - try (final TestTopology topology = app.startApp()) { - topology.input().add("foo", "bar"); - topology.streamOutput() - .expectNextRecord() - .hasKey("foo") - .hasValue("baz") - .expectNoMoreRecord(); - } - } - - @Test - void shouldFlatTransformValuesNamed() { - final ValueTransformerSupplier> transformer = () -> new SimpleValueTransformer<>() { - - @Override - public Iterable transform(final String value) { - if ("bar".equals(value)) { - return List.of("baz"); - } - throw new UnsupportedOperationException(); - } - }; - final StringApp app = new StringApp() { - @Override - public void buildTopology(final StreamsBuilderX builder) { - final KStreamX input = builder.stream("input"); - final KStreamX transformed = - input.flatTransformValues(transformer, Named.as("flatTransform")); - transformed.to("output"); - } - }; - try (final TestTopology topology = app.startApp()) { - topology.input().add("foo", "bar"); - topology.streamOutput() - .expectNextRecord() - .hasKey("foo") - .hasValue("baz") - .expectNoMoreRecord(); - } - } - - @Test - void shouldFlatTransformValuesUsingStore() { - final ValueTransformerSupplier> transformer = () -> new SimpleValueTransformer<>() { - - @Override - public Iterable transform(final String value) { - final KeyValueStore store = this.getStateStore("my-store"); - store.put(value, value); - return emptyList(); - } - }; - final StringApp app = new StringApp() { - @Override - public void buildTopology(final StreamsBuilderX builder) { - final StoreBuilder> store = builder.stores() - .keyValueStoreBuilder(Stores.inMemoryKeyValueStore("my-store"), Preconfigured.defaultSerde(), - Preconfigured.defaultSerde()); - builder.addStateStore(store); - final KStreamX input = builder.stream("input"); - input.flatTransformValues(transformer, "my-store"); - } - }; - try (final TestTopology topology = app.startApp()) { - topology.input().add("foo", "bar"); - final KeyValueStore store = - topology.getTestDriver().getKeyValueStore("my-store"); - this.softly.assertThat(store.get("bar")).isEqualTo("bar"); - } - } - - @Test - void shouldFlatTransformValuesNamedUsingStore() { - final ValueTransformerSupplier> transformer = () -> new SimpleValueTransformer<>() { - - @Override - public Iterable transform(final String value) { - final KeyValueStore store = this.getStateStore("my-store"); - store.put(value, value); - return emptyList(); - } - }; - final StringApp app = new StringApp() { - @Override - public void buildTopology(final StreamsBuilderX builder) { - final StoreBuilder> store = builder.stores() - .keyValueStoreBuilder(Stores.inMemoryKeyValueStore("my-store"), Preconfigured.defaultSerde(), - Preconfigured.defaultSerde()); - builder.addStateStore(store); - final KStreamX input = builder.stream("input"); - input.flatTransformValues(transformer, Named.as("flatTransform"), "my-store"); - } - }; - try (final TestTopology topology = app.startApp()) { - topology.input().add("foo", "bar"); - final KeyValueStore store = - topology.getTestDriver().getKeyValueStore("my-store"); - this.softly.assertThat(store.get("bar")).isEqualTo("bar"); - } - } - - @Test - void shouldTransformValuesWithKey() { - final ValueTransformerWithKeySupplier transformer = - () -> new SimpleValueTransformerWithKey<>() { - - @Override - public String transform(final String key, final String value) { - if ("foo".equals(key) && "bar".equals(value)) { - return "baz"; - } - throw new UnsupportedOperationException(); - } - }; - final StringApp app = new StringApp() { - @Override - public void buildTopology(final StreamsBuilderX builder) { - final KStreamX input = builder.stream("input"); - final KStreamX transformed = input.transformValues(transformer); - transformed.to("output"); - } - }; - try (final TestTopology topology = app.startApp()) { - topology.input().add("foo", "bar"); - topology.streamOutput() - .expectNextRecord() - .hasKey("foo") - .hasValue("baz") - .expectNoMoreRecord(); - } - } - - @Test - void shouldTransformValuesWithKeyNamed() { - final ValueTransformerWithKeySupplier transformer = - () -> new SimpleValueTransformerWithKey<>() { - - @Override - public String transform(final String key, final String value) { - if ("foo".equals(key) && "bar".equals(value)) { - return "baz"; - } - throw new UnsupportedOperationException(); - } - }; - final StringApp app = new StringApp() { - @Override - public void buildTopology(final StreamsBuilderX builder) { - final KStreamX input = builder.stream("input"); - final KStreamX transformed = input.transformValues(transformer, Named.as("transform")); - transformed.to("output"); - } - }; - try (final TestTopology topology = app.startApp()) { - topology.input().add("foo", "bar"); - topology.streamOutput() - .expectNextRecord() - .hasKey("foo") - .hasValue("baz") - .expectNoMoreRecord(); - } - } - - @Test - void shouldTransformValuesWithKeyUsingStore() { - final ValueTransformerWithKeySupplier transformer = - () -> new SimpleValueTransformerWithKey<>() { - - @Override - public String transform(final String key, final String value) { - final KeyValueStore store = this.getStateStore("my-store"); - store.put(key, value); - return null; - } - }; - final StringApp app = new StringApp() { - @Override - public void buildTopology(final StreamsBuilderX builder) { - final StoreBuilder> store = builder.stores() - .keyValueStoreBuilder(Stores.inMemoryKeyValueStore("my-store"), Preconfigured.defaultSerde(), - Preconfigured.defaultSerde()); - builder.addStateStore(store); - final KStreamX input = builder.stream("input"); - input.transformValues(transformer, "my-store"); - } - }; - try (final TestTopology topology = app.startApp()) { - topology.input().add("foo", "bar"); - final KeyValueStore store = - topology.getTestDriver().getKeyValueStore("my-store"); - this.softly.assertThat(store.get("foo")).isEqualTo("bar"); - } - } - - @Test - void shouldTransformValuesWithKeyNamedUsingStore() { - final ValueTransformerWithKeySupplier transformer = - () -> new SimpleValueTransformerWithKey<>() { - - @Override - public String transform(final String key, final String value) { - final KeyValueStore store = this.getStateStore("my-store"); - store.put(key, value); - return null; - } - }; - final StringApp app = new StringApp() { - @Override - public void buildTopology(final StreamsBuilderX builder) { - final StoreBuilder> store = builder.stores() - .keyValueStoreBuilder(Stores.inMemoryKeyValueStore("my-store"), Preconfigured.defaultSerde(), - Preconfigured.defaultSerde()); - builder.addStateStore(store); - final KStreamX input = builder.stream("input"); - input.transformValues(transformer, Named.as("transform"), "my-store"); - } - }; - try (final TestTopology topology = app.startApp()) { - topology.input().add("foo", "bar"); - final KeyValueStore store = - topology.getTestDriver().getKeyValueStore("my-store"); - this.softly.assertThat(store.get("foo")).isEqualTo("bar"); - } - } - - @Test - void shouldFlatTransformValuesWithKey() { - final ValueTransformerWithKeySupplier> transformer = - () -> new SimpleValueTransformerWithKey<>() { - - @Override - public Iterable transform(final String key, final String value) { - if ("foo".equals(key) && "bar".equals(value)) { - return List.of("baz"); - } - throw new UnsupportedOperationException(); - } - }; - final StringApp app = new StringApp() { - @Override - public void buildTopology(final StreamsBuilderX builder) { - final KStreamX input = builder.stream("input"); - final KStreamX transformed = input.flatTransformValues(transformer); - transformed.to("output"); - } - }; - try (final TestTopology topology = app.startApp()) { - topology.input().add("foo", "bar"); - topology.streamOutput() - .expectNextRecord() - .hasKey("foo") - .hasValue("baz") - .expectNoMoreRecord(); - } - } - - @Test - void shouldFlatTransformValuesWithKeyNamed() { - final ValueTransformerWithKeySupplier> transformer = - () -> new SimpleValueTransformerWithKey<>() { - - @Override - public Iterable transform(final String key, final String value) { - if ("foo".equals(key) && "bar".equals(value)) { - return List.of("baz"); - } - throw new UnsupportedOperationException(); - } - }; - final StringApp app = new StringApp() { - @Override - public void buildTopology(final StreamsBuilderX builder) { - final KStreamX input = builder.stream("input"); - final KStreamX transformed = - input.flatTransformValues(transformer, Named.as("flatTransform")); - transformed.to("output"); - } - }; - try (final TestTopology topology = app.startApp()) { - topology.input().add("foo", "bar"); - topology.streamOutput() - .expectNextRecord() - .hasKey("foo") - .hasValue("baz") - .expectNoMoreRecord(); - } - } - - @Test - void shouldFlatTransformValuesWithKeyUsingStore() { - final ValueTransformerWithKeySupplier> transformer = - () -> new SimpleValueTransformerWithKey<>() { - - @Override - public Iterable transform(final String key, final String value) { - final KeyValueStore store = this.getStateStore("my-store"); - store.put(key, value); - return emptyList(); - } - }; - final StringApp app = new StringApp() { - @Override - public void buildTopology(final StreamsBuilderX builder) { - final StoreBuilder> store = builder.stores() - .keyValueStoreBuilder(Stores.inMemoryKeyValueStore("my-store"), Preconfigured.defaultSerde(), - Preconfigured.defaultSerde()); - builder.addStateStore(store); - final KStreamX input = builder.stream("input"); - input.flatTransformValues(transformer, "my-store"); - } - }; - try (final TestTopology topology = app.startApp()) { - topology.input().add("foo", "bar"); - final KeyValueStore store = - topology.getTestDriver().getKeyValueStore("my-store"); - this.softly.assertThat(store.get("foo")).isEqualTo("bar"); - } - } - - @Test - void shouldFlatTransformValuesWithKeyNamedUsingStore() { - final ValueTransformerWithKeySupplier> transformer = - () -> new SimpleValueTransformerWithKey<>() { - - @Override - public Iterable transform(final String key, final String value) { - final KeyValueStore store = this.getStateStore("my-store"); - store.put(key, value); - return emptyList(); - } - }; - final StringApp app = new StringApp() { - @Override - public void buildTopology(final StreamsBuilderX builder) { - final StoreBuilder> store = builder.stores() - .keyValueStoreBuilder(Stores.inMemoryKeyValueStore("my-store"), Preconfigured.defaultSerde(), - Preconfigured.defaultSerde()); - builder.addStateStore(store); - final KStreamX input = builder.stream("input"); - input.flatTransformValues(transformer, Named.as("flatTransform"), "my-store"); - } - }; - try (final TestTopology topology = app.startApp()) { - topology.input().add("foo", "bar"); - final KeyValueStore store = - topology.getTestDriver().getKeyValueStore("my-store"); - this.softly.assertThat(store.get("foo")).isEqualTo("bar"); - } - } - - @Test - void shouldLegacyProcess() { - final org.apache.kafka.streams.processor.ProcessorSupplier processor = - () -> new SimpleLegacyProcessor<>() { - - @Override - public void process(final String key, final String value) { - final KeyValueStore store = this.getStateStore("my-store"); - store.put(key, value); - } - }; - final StringApp app = new StringApp() { - @Override - public void buildTopology(final StreamsBuilderX builder) { - final StoreBuilder> store = builder.stores() - .keyValueStoreBuilder(Stores.inMemoryKeyValueStore("my-store"), Preconfigured.defaultSerde(), - Preconfigured.defaultSerde()); - builder.addStateStore(store); - final KStreamX input = builder.stream("input"); - input.process(processor, "my-store"); - } - }; - try (final TestTopology topology = app.startApp()) { - topology.input().add("foo", "bar"); - final KeyValueStore store = - topology.getTestDriver().getKeyValueStore("my-store"); - this.softly.assertThat(store.get("foo")).isEqualTo("bar"); - } - } - - @Test - void shouldLegacyProcessNamed() { - final org.apache.kafka.streams.processor.ProcessorSupplier processor = - () -> new SimpleLegacyProcessor<>() { - - @Override - public void process(final String key, final String value) { - final KeyValueStore store = this.getStateStore("my-store"); - store.put(key, value); - } - }; - final StringApp app = new StringApp() { - @Override - public void buildTopology(final StreamsBuilderX builder) { - final StoreBuilder> store = builder.stores() - .keyValueStoreBuilder(Stores.inMemoryKeyValueStore("my-store"), Preconfigured.defaultSerde(), - Preconfigured.defaultSerde()); - builder.addStateStore(store); - final KStreamX input = builder.stream("input"); - input.process(processor, Named.as("process"), "my-store"); - } - }; - try (final TestTopology topology = app.startApp()) { - topology.input().add("foo", "bar"); - final KeyValueStore store = - topology.getTestDriver().getKeyValueStore("my-store"); - this.softly.assertThat(store.get("foo")).isEqualTo("bar"); - } - } - } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/KTableXTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/KTableXTest.java index 19145dec4..2f99c04f6 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/KTableXTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/KTableXTest.java @@ -1337,14 +1337,14 @@ public void buildTopology(final StreamsBuilderX builder) { } @Test - void shouldFKeyJoinNamed() { + void shouldFKeyJoinTableJoined() { final StringApp app = new StringApp() { @Override public void buildTopology(final StreamsBuilderX builder) { final KTableX input = builder.table("input"); final KTableX otherInput = builder.table("other_input"); final KTableX joined = - input.join(otherInput, Function.identity(), (v1, v2) -> v1 + v2, Named.as("join")); + input.join(otherInput, Function.identity(), (v1, v2) -> v1 + v2, TableJoined.as("join")); joined.toStream().to("output"); } }; @@ -1362,23 +1362,31 @@ public void buildTopology(final StreamsBuilderX builder) { } @Test - void shouldFKeyJoinTableJoined() { - final StringApp app = new StringApp() { + void shouldFKeyJoinUsingMaterialized() { + final DoubleApp app = new DoubleApp() { @Override public void buildTopology(final StreamsBuilderX builder) { - final KTableX input = builder.table("input"); - final KTableX otherInput = builder.table("other_input"); - final KTableX joined = - input.join(otherInput, Function.identity(), (v1, v2) -> v1 + v2, TableJoined.as("join")); - joined.toStream().to("output"); + final KTableX input = + builder.table("input", ConsumedX.with(Serdes.String(), Serdes.String())); + final KTableX otherInput = + builder.table("other_input", ConsumedX.with(Serdes.String(), Serdes.String())); + final KTableX joined = input.join(otherInput, Function.identity(), (v1, v2) -> v1 + v2, + MaterializedX.with(Serdes.String(), Serdes.String())); + joined.toStream().to("output", ProducedX.with(Serdes.String(), Serdes.String())); } }; - try (final TestTopology topology = app.startApp()) { + try (final TestTopology topology = app.startApp()) { topology.input("input") + .withKeySerde(Serdes.String()) + .withValueSerde(Serdes.String()) .add("foo", "bar"); topology.input("other_input") + .withKeySerde(Serdes.String()) + .withValueSerde(Serdes.String()) .add("bar", "baz"); topology.streamOutput() + .withKeySerde(Serdes.String()) + .withValueSerde(Serdes.String()) .expectNextRecord() .hasKey("foo") .hasValue("barbaz") @@ -1387,7 +1395,7 @@ public void buildTopology(final StreamsBuilderX builder) { } @Test - void shouldFKeyJoinUsingMaterialized() { + void shouldFKeyJoinTableJoinedUsingMaterialized() { final DoubleApp app = new DoubleApp() { @Override public void buildTopology(final StreamsBuilderX builder) { @@ -1395,8 +1403,9 @@ public void buildTopology(final StreamsBuilderX builder) { builder.table("input", ConsumedX.with(Serdes.String(), Serdes.String())); final KTableX otherInput = builder.table("other_input", ConsumedX.with(Serdes.String(), Serdes.String())); - final KTableX joined = input.join(otherInput, Function.identity(), (v1, v2) -> v1 + v2, - MaterializedX.with(Serdes.String(), Serdes.String())); + final KTableX joined = + input.join(otherInput, Function.identity(), (v1, v2) -> v1 + v2, TableJoined.as("join"), + MaterializedX.with(Serdes.String(), Serdes.String())); joined.toStream().to("output", ProducedX.with(Serdes.String(), Serdes.String())); } }; @@ -1420,7 +1429,63 @@ public void buildTopology(final StreamsBuilderX builder) { } @Test - void shouldFKeyJoinNamedUsingMaterialized() { + void shouldLeftFKeyJoin() { + final StringApp app = new StringApp() { + @Override + public void buildTopology(final StreamsBuilderX builder) { + final KTableX input = builder.table("input"); + final KTableX otherInput = builder.table("other_input"); + final KTableX joined = input.leftJoin(otherInput, Function.identity(), + (v1, v2) -> v2 == null ? v1 : v1 + v2); + joined.toStream().to("output"); + } + }; + try (final TestTopology topology = app.startApp()) { + topology.input("input") + .add("foo", "bar"); + topology.input("other_input") + .add("bar", "baz"); + topology.streamOutput() + .expectNextRecord() + .hasKey("foo") + .hasValue("bar") + .expectNextRecord() + .hasKey("foo") + .hasValue("barbaz") + .expectNoMoreRecord(); + } + } + + @Test + void shouldLeftFKeyJoinTableJoined() { + final StringApp app = new StringApp() { + @Override + public void buildTopology(final StreamsBuilderX builder) { + final KTableX input = builder.table("input"); + final KTableX otherInput = builder.table("other_input"); + final KTableX joined = input.leftJoin(otherInput, Function.identity(), + (v1, v2) -> v2 == null ? v1 : v1 + v2, TableJoined.as("join")); + joined.toStream().to("output"); + } + }; + try (final TestTopology topology = app.startApp()) { + topology.input("input") + .add("foo", "bar"); + topology.input("other_input") + .add("bar", "baz"); + topology.streamOutput() + .expectNextRecord() + .hasKey("foo") + .hasValue("bar") + .expectNextRecord() + .hasKey("foo") + .hasValue("barbaz") + .expectNoMoreRecord(); + } + } + + @Test + void shouldLeftFKeyJoinUsingMaterialized() { final DoubleApp app = new DoubleApp() { @Override public void buildTopology(final StreamsBuilderX builder) { @@ -1429,8 +1494,8 @@ public void buildTopology(final StreamsBuilderX builder) { final KTableX otherInput = builder.table("other_input", ConsumedX.with(Serdes.String(), Serdes.String())); final KTableX joined = - input.join(otherInput, Function.identity(), (v1, v2) -> v1 + v2, Named.as("join"), - Materialized.with(Serdes.String(), Serdes.String())); + input.leftJoin(otherInput, Function.identity(), (v1, v2) -> v2 == null ? v1 : v1 + v2, + MaterializedX.with(Serdes.String(), Serdes.String())); joined.toStream().to("output", ProducedX.with(Serdes.String(), Serdes.String())); } }; @@ -1448,13 +1513,16 @@ public void buildTopology(final StreamsBuilderX builder) { .withValueSerde(Serdes.String()) .expectNextRecord() .hasKey("foo") + .hasValue("bar") + .expectNextRecord() + .hasKey("foo") .hasValue("barbaz") .expectNoMoreRecord(); } } @Test - void shouldFKeyJoinTableJoinedUsingMaterialized() { + void shouldLeftFKeyJoinTableJoinedUsingMaterialized() { final DoubleApp app = new DoubleApp() { @Override public void buildTopology(final StreamsBuilderX builder) { @@ -1463,8 +1531,8 @@ public void buildTopology(final StreamsBuilderX builder) { final KTableX otherInput = builder.table("other_input", ConsumedX.with(Serdes.String(), Serdes.String())); final KTableX joined = - input.join(otherInput, Function.identity(), (v1, v2) -> v1 + v2, TableJoined.as("join"), - MaterializedX.with(Serdes.String(), Serdes.String())); + input.leftJoin(otherInput, Function.identity(), (v1, v2) -> v2 == null ? v1 : v1 + v2, + TableJoined.as("join"), MaterializedX.with(Serdes.String(), Serdes.String())); joined.toStream().to("output", ProducedX.with(Serdes.String(), Serdes.String())); } }; @@ -1482,20 +1550,22 @@ public void buildTopology(final StreamsBuilderX builder) { .withValueSerde(Serdes.String()) .expectNextRecord() .hasKey("foo") + .hasValue("bar") + .expectNextRecord() + .hasKey("foo") .hasValue("barbaz") .expectNoMoreRecord(); } } @Test - void shouldLeftFKeyJoin() { + void shouldFKeyJoinWithKey() { final StringApp app = new StringApp() { @Override public void buildTopology(final StreamsBuilderX builder) { final KTableX input = builder.table("input"); final KTableX otherInput = builder.table("other_input"); - final KTableX joined = input.leftJoin(otherInput, Function.identity(), - (v1, v2) -> v2 == null ? v1 : v1 + v2); + final KTableX joined = input.join(otherInput, (k, v) -> v, (v1, v2) -> v1 + v2); joined.toStream().to("output"); } }; @@ -1505,9 +1575,6 @@ public void buildTopology(final StreamsBuilderX builder) { topology.input("other_input") .add("bar", "baz"); topology.streamOutput() - .expectNextRecord() - .hasKey("foo") - .hasValue("bar") .expectNextRecord() .hasKey("foo") .hasValue("barbaz") @@ -1516,14 +1583,14 @@ public void buildTopology(final StreamsBuilderX builder) { } @Test - void shouldLeftFKeyJoinNamed() { + void shouldFKeyJoinTableJoinedWithKey() { final StringApp app = new StringApp() { @Override public void buildTopology(final StreamsBuilderX builder) { final KTableX input = builder.table("input"); final KTableX otherInput = builder.table("other_input"); - final KTableX joined = input.leftJoin(otherInput, Function.identity(), - (v1, v2) -> v2 == null ? v1 : v1 + v2, Named.as("join")); + final KTableX joined = + input.join(otherInput, (k, v) -> v, (v1, v2) -> v1 + v2, TableJoined.as("join")); joined.toStream().to("output"); } }; @@ -1533,9 +1600,6 @@ public void buildTopology(final StreamsBuilderX builder) { topology.input("other_input") .add("bar", "baz"); topology.streamOutput() - .expectNextRecord() - .hasKey("foo") - .hasValue("bar") .expectNextRecord() .hasKey("foo") .hasValue("barbaz") @@ -1544,26 +1608,31 @@ public void buildTopology(final StreamsBuilderX builder) { } @Test - void shouldLeftFKeyJoinTableJoined() { - final StringApp app = new StringApp() { + void shouldFKeyJoinWithKeyUsingMaterialized() { + final DoubleApp app = new DoubleApp() { @Override public void buildTopology(final StreamsBuilderX builder) { - final KTableX input = builder.table("input"); - final KTableX otherInput = builder.table("other_input"); - final KTableX joined = input.leftJoin(otherInput, Function.identity(), - (v1, v2) -> v2 == null ? v1 : v1 + v2, TableJoined.as("join")); - joined.toStream().to("output"); + final KTableX input = + builder.table("input", ConsumedX.with(Serdes.String(), Serdes.String())); + final KTableX otherInput = + builder.table("other_input", ConsumedX.with(Serdes.String(), Serdes.String())); + final KTableX joined = input.join(otherInput, (k, v) -> v, (v1, v2) -> v1 + v2, + MaterializedX.with(Serdes.String(), Serdes.String())); + joined.toStream().to("output", ProducedX.with(Serdes.String(), Serdes.String())); } }; - try (final TestTopology topology = app.startApp()) { + try (final TestTopology topology = app.startApp()) { topology.input("input") + .withKeySerde(Serdes.String()) + .withValueSerde(Serdes.String()) .add("foo", "bar"); topology.input("other_input") + .withKeySerde(Serdes.String()) + .withValueSerde(Serdes.String()) .add("bar", "baz"); topology.streamOutput() - .expectNextRecord() - .hasKey("foo") - .hasValue("bar") + .withKeySerde(Serdes.String()) + .withValueSerde(Serdes.String()) .expectNextRecord() .hasKey("foo") .hasValue("barbaz") @@ -1572,7 +1641,7 @@ public void buildTopology(final StreamsBuilderX builder) { } @Test - void shouldLeftFKeyJoinUsingMaterialized() { + void shouldFKeyJoinTableJoinedWithKeyUsingMaterialized() { final DoubleApp app = new DoubleApp() { @Override public void buildTopology(final StreamsBuilderX builder) { @@ -1581,7 +1650,7 @@ public void buildTopology(final StreamsBuilderX builder) { final KTableX otherInput = builder.table("other_input", ConsumedX.with(Serdes.String(), Serdes.String())); final KTableX joined = - input.leftJoin(otherInput, Function.identity(), (v1, v2) -> v2 == null ? v1 : v1 + v2, + input.join(otherInput, (k, v) -> v, (v1, v2) -> v1 + v2, TableJoined.as("join"), MaterializedX.with(Serdes.String(), Serdes.String())); joined.toStream().to("output", ProducedX.with(Serdes.String(), Serdes.String())); } @@ -1598,6 +1667,31 @@ public void buildTopology(final StreamsBuilderX builder) { topology.streamOutput() .withKeySerde(Serdes.String()) .withValueSerde(Serdes.String()) + .expectNextRecord() + .hasKey("foo") + .hasValue("barbaz") + .expectNoMoreRecord(); + } + } + + @Test + void shouldLeftFKeyJoinWithKey() { + final StringApp app = new StringApp() { + @Override + public void buildTopology(final StreamsBuilderX builder) { + final KTableX input = builder.table("input"); + final KTableX otherInput = builder.table("other_input"); + final KTableX joined = input.leftJoin(otherInput, (k, v) -> v, + (v1, v2) -> v2 == null ? v1 : v1 + v2); + joined.toStream().to("output"); + } + }; + try (final TestTopology topology = app.startApp()) { + topology.input("input") + .add("foo", "bar"); + topology.input("other_input") + .add("bar", "baz"); + topology.streamOutput() .expectNextRecord() .hasKey("foo") .hasValue("bar") @@ -1609,7 +1703,35 @@ public void buildTopology(final StreamsBuilderX builder) { } @Test - void shouldLeftFKeyJoinNamedUsingMaterialized() { + void shouldLeftFKeyJoinTableJoinedWithKey() { + final StringApp app = new StringApp() { + @Override + public void buildTopology(final StreamsBuilderX builder) { + final KTableX input = builder.table("input"); + final KTableX otherInput = builder.table("other_input"); + final KTableX joined = input.leftJoin(otherInput, (k, v) -> v, + (v1, v2) -> v2 == null ? v1 : v1 + v2, TableJoined.as("join")); + joined.toStream().to("output"); + } + }; + try (final TestTopology topology = app.startApp()) { + topology.input("input") + .add("foo", "bar"); + topology.input("other_input") + .add("bar", "baz"); + topology.streamOutput() + .expectNextRecord() + .hasKey("foo") + .hasValue("bar") + .expectNextRecord() + .hasKey("foo") + .hasValue("barbaz") + .expectNoMoreRecord(); + } + } + + @Test + void shouldLeftFKeyJoinWithKeyUsingMaterialized() { final DoubleApp app = new DoubleApp() { @Override public void buildTopology(final StreamsBuilderX builder) { @@ -1618,8 +1740,8 @@ public void buildTopology(final StreamsBuilderX builder) { final KTableX otherInput = builder.table("other_input", ConsumedX.with(Serdes.String(), Serdes.String())); final KTableX joined = - input.leftJoin(otherInput, Function.identity(), (v1, v2) -> v2 == null ? v1 : v1 + v2, - Named.as("join"), Materialized.with(Serdes.String(), Serdes.String())); + input.leftJoin(otherInput, (k, v) -> v, (v1, v2) -> v2 == null ? v1 : v1 + v2, + MaterializedX.with(Serdes.String(), Serdes.String())); joined.toStream().to("output", ProducedX.with(Serdes.String(), Serdes.String())); } }; @@ -1646,7 +1768,7 @@ public void buildTopology(final StreamsBuilderX builder) { } @Test - void shouldLeftFKeyJoinTableJoinedUsingMaterialized() { + void shouldLeftFKeyJoinTableJoinedWithKeyUsingMaterialized() { final DoubleApp app = new DoubleApp() { @Override public void buildTopology(final StreamsBuilderX builder) { @@ -1655,7 +1777,7 @@ public void buildTopology(final StreamsBuilderX builder) { final KTableX otherInput = builder.table("other_input", ConsumedX.with(Serdes.String(), Serdes.String())); final KTableX joined = - input.leftJoin(otherInput, Function.identity(), (v1, v2) -> v2 == null ? v1 : v1 + v2, + input.leftJoin(otherInput, (k, v) -> v, (v1, v2) -> v2 == null ? v1 : v1 + v2, TableJoined.as("join"), MaterializedX.with(Serdes.String(), Serdes.String())); joined.toStream().to("output", ProducedX.with(Serdes.String(), Serdes.String())); } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ProducedXTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ProducedXTest.java index 6b255cc06..9f65384eb 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ProducedXTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/ProducedXTest.java @@ -32,6 +32,8 @@ import com.bakdata.kafka.SenderBuilder.SimpleProducerRecord; import com.bakdata.kafka.util.TopologyInformation; import java.util.List; +import java.util.Optional; +import java.util.Set; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.Serdes; @@ -221,7 +223,10 @@ void shouldUseStreamPartitioner() { public void buildTopology(final StreamsBuilderX builder) { final KStreamX input = builder.stream("input"); input.to("output", - ProducedX.streamPartitioner((topic, key, value, numPartitions) -> "bar".equals(value) ? 0 : 1)); + ProducedX.streamPartitioner((topic, key, value, numPartitions) -> { + final int partition = "bar".equals(value) ? 0 : 1; + return Optional.of(Set.of(partition)); + })); } }; try (final KafkaContainer kafkaCluster = KafkaTest.newCluster()) { @@ -271,7 +276,10 @@ void shouldUseStreamPartitionerModifier() { public void buildTopology(final StreamsBuilderX builder) { final KStreamX input = builder.stream("input"); input.to("output", ProducedX.as("output") - .withStreamPartitioner((topic, key, value, numPartitions) -> "bar".equals(value) ? 0 : 1)); + .withStreamPartitioner((topic, key, value, numPartitions) -> { + final int partition = "bar".equals(value) ? 0 : 1; + return Optional.of(Set.of(partition)); + })); } }; try (final KafkaContainer kafkaCluster = KafkaTest.newCluster()) { diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/RepartitionedXTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/RepartitionedXTest.java index 7b026ddc1..c5bf84a21 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/RepartitionedXTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/RepartitionedXTest.java @@ -35,6 +35,8 @@ import com.bakdata.kafka.util.TopicSettings; import com.bakdata.kafka.util.TopologyInformation; import java.util.List; +import java.util.Optional; +import java.util.Set; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.Serdes; @@ -234,7 +236,10 @@ public void buildTopology(final StreamsBuilderX builder) { final KStreamX input = builder.stream("input"); final KStreamX repartitioned = input.repartition(RepartitionedX .streamPartitioner( - (topic, key, value, numPartitions) -> "bar".equals(value) ? 0 : 1) + (topic, key, value, numPartitions) -> { + final int partition = "bar".equals(value) ? 0 : 1; + return Optional.of(Set.of(partition)); + }) .withNumberOfPartitions(2) .withName("repartition")); repartitioned.to("output"); @@ -303,7 +308,10 @@ public void buildTopology(final StreamsBuilderX builder) { final KStreamX repartitioned = input.repartition( RepartitionedX.as("repartition") .withStreamPartitioner( - (topic, key, value, numPartitions) -> "bar".equals(value) ? 0 : 1) + (topic, key, value, numPartitions) -> { + final int partition = "bar".equals(value) ? 0 : 1; + return Optional.of(Set.of(partition)); + }) .withNumberOfPartitions(2)); repartitioned.to("output"); } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/SimpleLegacyProcessor.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/SimpleLegacyProcessor.java deleted file mode 100644 index 951ad25a8..000000000 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/SimpleLegacyProcessor.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2025 bakdata - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.bakdata.kafka; - -import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.StateStore; - -abstract class SimpleLegacyProcessor implements Processor { - private ProcessorContext context = null; - - @Override - public void init(final ProcessorContext context) { - this.context = context; - } - - @Override - public void close() { - // do nothing - } - - protected S getStateStore(final String name) { - return this.context.getStateStore(name); - } - -} diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/SimpleTransformer.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/SimpleTransformer.java deleted file mode 100644 index d458c68c9..000000000 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/SimpleTransformer.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2025 bakdata - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.bakdata.kafka; - -import org.apache.kafka.streams.kstream.Transformer; -import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.StateStore; - -abstract class SimpleTransformer implements Transformer { - private ProcessorContext context = null; - - @Override - public void init(final ProcessorContext context) { - this.context = context; - } - - @Override - public void close() { - // do nothing - } - - protected S getStateStore(final String name) { - return this.context.getStateStore(name); - } - -} diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/SimpleValueTransformer.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/SimpleValueTransformer.java deleted file mode 100644 index 61d7c4ad9..000000000 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/SimpleValueTransformer.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2025 bakdata - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.bakdata.kafka; - -import org.apache.kafka.streams.kstream.ValueTransformer; -import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.StateStore; - -abstract class SimpleValueTransformer implements ValueTransformer { - private ProcessorContext context = null; - - @Override - public void init(final ProcessorContext context) { - this.context = context; - } - - @Override - public void close() { - // do nothing - } - - protected S getStateStore(final String name) { - return this.context.getStateStore(name); - } - -} diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/StreamsBuilderXTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/StreamsBuilderXTest.java index c3248cc36..dae88eaa6 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/StreamsBuilderXTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/StreamsBuilderXTest.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.regex.Pattern; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.GlobalKTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.processor.api.ProcessorSupplier; @@ -523,7 +524,11 @@ public void buildTopology(final StreamsBuilderX builder) { final KStreamX input = builder.stream("input", ConsumedX.with(Serdes.String(), Serdes.String())); final GlobalKTable otherInput = - builder.globalTable("table_input", MaterializedX.with(Serdes.String(), Serdes.String())); + builder.globalTable("table_input", + // Kafka 4.0 likely contains a bug if Materialized does not have a name + MaterializedX.>as("store") + .withKeySerde(Serdes.String()) + .withValueSerde(Serdes.String())); final KStreamX joined = input.join(otherInput, (k, v) -> k, (v1, v2) -> v1 + v2); joined.to("output", ProducedX.with(Serdes.String(), Serdes.String())); } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java index 308e16750..5bb3ea2e8 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java @@ -301,7 +301,6 @@ void shouldDeleteInternalTopics() { uniqueAppId + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-repartition"; final String backingTopic = uniqueAppId + "-KSTREAM-REDUCE-STATE-STORE-0000000003-changelog"; - final String manualTopic = ComplexTopologyApplication.THROUGH_TOPIC; try (final ImprovedAdminClient admin = testClient.admin(); final TopicClient topicClient = admin.getTopicClient()) { @@ -310,7 +309,6 @@ void shouldDeleteInternalTopics() { } this.softly.assertThat(topicClient.exists(internalTopic)).isTrue(); this.softly.assertThat(topicClient.exists(backingTopic)).isTrue(); - this.softly.assertThat(topicClient.exists(manualTopic)).isTrue(); } awaitClosed(executableApp); @@ -323,7 +321,6 @@ void shouldDeleteInternalTopics() { } this.softly.assertThat(topicClient.exists(internalTopic)).isFalse(); this.softly.assertThat(topicClient.exists(backingTopic)).isFalse(); - this.softly.assertThat(topicClient.exists(manualTopic)).isTrue(); } } } @@ -358,7 +355,7 @@ void shouldDeleteIntermediateTopics() { } awaitClosed(executableApp); - clean(executableApp); + reset(executableApp); try (final ImprovedAdminClient admin = testClient.admin(); final TopicClient topicClient = admin.getTopicClient()) { @@ -531,14 +528,13 @@ void shouldDeleteSchemaOfInternalTopics() uniqueAppId + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-repartition" + "-value"; final String backingSubject = uniqueAppId + "-KSTREAM-REDUCE-STATE-STORE-0000000003-changelog" + "-value"; - final String manualSubject = ComplexTopologyApplication.THROUGH_TOPIC + "-value"; this.softly.assertThat(client.getAllSubjects()) - .contains(inputSubject, internalSubject, backingSubject, manualSubject); + .contains(inputSubject, internalSubject, backingSubject); reset(executableApp); this.softly.assertThat(client.getAllSubjects()) .doesNotContain(internalSubject, backingSubject) - .contains(inputSubject, manualSubject); + .contains(inputSubject); } } @@ -568,7 +564,7 @@ void shouldDeleteSchemaOfIntermediateTopics() final String manualSubject = ComplexTopologyApplication.THROUGH_TOPIC + "-value"; this.softly.assertThat(client.getAllSubjects()) .contains(inputSubject, manualSubject); - clean(executableApp); + reset(executableApp); this.softly.assertThat(client.getAllSubjects()) .doesNotContain(manualSubject) @@ -577,7 +573,7 @@ void shouldDeleteSchemaOfIntermediateTopics() } @Test - void shouldCallCleanupHookForInternalTopics() { + void shouldCallCleanupHookForInternalAndIntermediateTopics() { try (final ConfiguredStreamsApp app = this.createComplexCleanUpHookApplication(); final ExecutableStreamsApp executableApp = this.createExecutableApp(app, this.createConfig())) { @@ -586,6 +582,7 @@ void shouldCallCleanupHookForInternalTopics() { verify(this.topicHook).deleted(uniqueAppId + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-repartition"); verify(this.topicHook).deleted(uniqueAppId + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-changelog"); verify(this.topicHook).deleted(uniqueAppId + "-KSTREAM-REDUCE-STATE-STORE-0000000003-changelog"); + verify(this.topicHook).deleted(ComplexTopologyApplication.THROUGH_TOPIC); verify(this.topicHook).close(); verifyNoMoreInteractions(this.topicHook); } diff --git a/streams-bootstrap-core/src/test/resources/log4j2.xml b/streams-bootstrap-core/src/test/resources/log4j2.xml index 0d4071ce2..dfbe33f8d 100644 --- a/streams-bootstrap-core/src/test/resources/log4j2.xml +++ b/streams-bootstrap-core/src/test/resources/log4j2.xml @@ -30,5 +30,14 @@ + + + + + + + + + diff --git a/streams-bootstrap-test/src/main/java/com/bakdata/kafka/ConsumerGroupVerifier.java b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/ConsumerGroupVerifier.java index 29a14cc24..7957020d2 100644 --- a/streams-bootstrap-test/src/main/java/com/bakdata/kafka/ConsumerGroupVerifier.java +++ b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/ConsumerGroupVerifier.java @@ -36,7 +36,7 @@ import org.apache.kafka.clients.admin.ConsumerGroupDescription; import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo; import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.common.ConsumerGroupState; +import org.apache.kafka.common.GroupState; import org.apache.kafka.common.TopicPartition; /** @@ -61,30 +61,31 @@ public static ConsumerGroupVerifier verify(final ExecutableStreamsApp app) { } /** - * Check whether consumer group has state {@link ConsumerGroupState#STABLE} - * @return true if consumer group has state {@link ConsumerGroupState#STABLE} + * Check whether consumer group has state {@link GroupState#STABLE} + * @return true if consumer group has state {@link GroupState#STABLE} */ public boolean isActive() { - return this.getState() == ConsumerGroupState.STABLE; + return this.getState() == GroupState.STABLE; } /** - * Check whether consumer group has state {@link ConsumerGroupState#EMPTY} - * @return true if consumer group has state {@link ConsumerGroupState#EMPTY} + * Check whether consumer group has state {@link GroupState#EMPTY} + * @return true if consumer group has state {@link GroupState#EMPTY} */ public boolean isClosed() { - return this.getState() == ConsumerGroupState.EMPTY; + return this.getState() == GroupState.EMPTY; } /** * Get current state of consumer group + * * @return current state of consumer group */ - public ConsumerGroupState getState() { + public GroupState getState() { try (final ImprovedAdminClient admin = this.adminClientSupplier.get(); final ConsumerGroupClient consumerGroupClient = admin.getConsumerGroupClient()) { final ConsumerGroupDescription description = consumerGroupClient.describe(this.group); - final ConsumerGroupState state = description.state(); + final GroupState state = description.groupState(); log.debug("Consumer group '{}' has state {}", this.group, state); return state; } diff --git a/streams-bootstrap-test/src/main/java/com/bakdata/kafka/KafkaTestClient.java b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/KafkaTestClient.java index 071d80ea6..f822e3fe4 100644 --- a/streams-bootstrap-test/src/main/java/com/bakdata/kafka/KafkaTestClient.java +++ b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/KafkaTestClient.java @@ -34,7 +34,7 @@ import lombok.NonNull; import lombok.RequiredArgsConstructor; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy; /** * Client that supports communication with Kafka clusters in test setups, including topic management, reading from @@ -65,12 +65,12 @@ public SenderBuilder send() { /** * Prepare reading data from the cluster. {@link ConsumerConfig#AUTO_OFFSET_RESET_CONFIG} is configured to - * {@link OffsetResetStrategy#EARLIEST} + * {@link AutoOffsetResetStrategy#EARLIEST} * @return configured {@code ReaderBuilder} */ public ReaderBuilder read() { return new ReaderBuilder(this.configuration.createKafkaProperties()) - .with(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.toString()); + .with(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AutoOffsetResetStrategy.EARLIEST.type().toString()); } /** diff --git a/streams-bootstrap-test/src/test/resources/log4j2.xml b/streams-bootstrap-test/src/test/resources/log4j2.xml index 0d4071ce2..dfbe33f8d 100644 --- a/streams-bootstrap-test/src/test/resources/log4j2.xml +++ b/streams-bootstrap-test/src/test/resources/log4j2.xml @@ -30,5 +30,14 @@ + + + + + + + + + diff --git a/streams-bootstrap-test/src/testFixtures/java/com/bakdata/kafka/KafkaTest.java b/streams-bootstrap-test/src/testFixtures/java/com/bakdata/kafka/KafkaTest.java index 1234f7320..1dc530bd7 100644 --- a/streams-bootstrap-test/src/testFixtures/java/com/bakdata/kafka/KafkaTest.java +++ b/streams-bootstrap-test/src/testFixtures/java/com/bakdata/kafka/KafkaTest.java @@ -46,7 +46,7 @@ public abstract class KafkaTest { private final KafkaContainer kafkaCluster = newCluster(); public static KafkaContainer newCluster() { - return new KafkaContainer(DockerImageName.parse("apache/kafka-native") + return new KafkaContainer(DockerImageName.parse("apache/kafka") //FIXME native image is flaky .withTag(AppInfoParser.getVersion())); }