diff --git a/build.gradle.kts b/build.gradle.kts
index c6f7a6d36..74268e8f3 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -11,7 +11,7 @@ allprojects {
repositories {
mavenCentral()
maven(url = "https://packages.confluent.io/maven/")
- maven(url = "https://s01.oss.sonatype.org/content/repositories/snapshots")
+ maven(url = "https://central.sonatype.com/repository/maven-snapshots")
}
}
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 2b3826a2b..4fbbec855 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -4,7 +4,7 @@ mockito = "5.17.0"
testcontainers = "1.21.0"
[libraries]
-kafka-streams-utils = { group = "com.bakdata.kafka", name = "kafka-streams-utils", version = "1.1.1" }
+kafka-streams-utils = { group = "com.bakdata.kafka", name = "kafka-streams-utils", version = "1.2.0" }
kafka-tools = { group = "org.apache.kafka", name = "kafka-tools" }
kafka-streams = { group = "org.apache.kafka", name = "kafka-streams" }
kafka-clients = { group = "org.apache.kafka", name = "kafka-clients" }
diff --git a/streams-bootstrap-cli-test/src/test/resources/log4j2.xml b/streams-bootstrap-cli-test/src/test/resources/log4j2.xml
index a3089a43a..aed046ea1 100644
--- a/streams-bootstrap-cli-test/src/test/resources/log4j2.xml
+++ b/streams-bootstrap-cli-test/src/test/resources/log4j2.xml
@@ -27,6 +27,15 @@
+
+
+
+
+
+
+
+
+
diff --git a/streams-bootstrap-cli/src/test/resources/log4j2.xml b/streams-bootstrap-cli/src/test/resources/log4j2.xml
index 0d4071ce2..dfbe33f8d 100644
--- a/streams-bootstrap-cli/src/test/resources/log4j2.xml
+++ b/streams-bootstrap-cli/src/test/resources/log4j2.xml
@@ -30,5 +30,14 @@
+
+
+
+
+
+
+
+
+
diff --git a/streams-bootstrap-core/build.gradle.kts b/streams-bootstrap-core/build.gradle.kts
index a608ea643..4b7b1de85 100644
--- a/streams-bootstrap-core/build.gradle.kts
+++ b/streams-bootstrap-core/build.gradle.kts
@@ -7,9 +7,7 @@ plugins {
dependencies {
api(libs.kafka.streams.utils)
- implementation(libs.kafka.tools) {
- exclude(group = "org.slf4j", module = "slf4j-reload4j")
- }
+ implementation(libs.kafka.tools)
api(libs.kafka.streams)
api(libs.kafka.clients)
diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredProducerApp.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredProducerApp.java
index 13c3d86ff..915f8bb07 100644
--- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredProducerApp.java
+++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredProducerApp.java
@@ -32,6 +32,7 @@
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.record.CompressionType;
/**
* A {@link ProducerApp} with a corresponding {@link ProducerTopicConfig}
@@ -50,7 +51,7 @@ private static Map createBaseConfig() {
kafkaConfig.put(ProducerConfig.ACKS_CONFIG, "all");
// compression
- kafkaConfig.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
+ kafkaConfig.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, CompressionType.GZIP.toString());
return kafkaConfig;
}
diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredStreamsApp.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredStreamsApp.java
index 14a1dfa91..a139678d8 100644
--- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredStreamsApp.java
+++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConfiguredStreamsApp.java
@@ -31,6 +31,7 @@
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
@@ -54,7 +55,8 @@ private static Map createBaseConfig() {
kafkaConfig.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "all");
// compression
- kafkaConfig.put(StreamsConfig.producerPrefix(ProducerConfig.COMPRESSION_TYPE_CONFIG), "gzip");
+ kafkaConfig.put(StreamsConfig.producerPrefix(ProducerConfig.COMPRESSION_TYPE_CONFIG),
+ CompressionType.GZIP.toString());
return kafkaConfig;
}
diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConsumedX.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConsumedX.java
index e530ba3ed..02baaff91 100644
--- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConsumedX.java
+++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConsumedX.java
@@ -26,7 +26,8 @@
import java.util.function.Function;
import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.streams.Topology.AutoOffsetReset;
+import org.apache.kafka.streams.AutoOffsetReset;
+import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.processor.TimestampExtractor;
@@ -109,6 +110,14 @@ public static ConsumedX with(final TimestampExtractor timestampExtr
return new ConsumedX<>(configurator -> Consumed.with(timestampExtractor));
}
+ /**
+ * @see Consumed#with(Topology.AutoOffsetReset)
+ */
+ @Deprecated(since = "5.0.0")
+ public static ConsumedX with(final Topology.AutoOffsetReset resetPolicy) {
+ return new ConsumedX<>(configurator -> Consumed.with(resetPolicy));
+ }
+
/**
* @see Consumed#with(AutoOffsetReset)
*/
@@ -159,6 +168,14 @@ public ConsumedX withOffsetResetPolicy(final AutoOffsetReset offsetResetPo
return this.modify(consumed -> consumed.withOffsetResetPolicy(offsetResetPolicy));
}
+ /**
+ * @see Consumed#withOffsetResetPolicy(Topology.AutoOffsetReset)
+ */
+ @Deprecated(since = "5.0.0")
+ public ConsumedX withOffsetResetPolicy(final Topology.AutoOffsetReset offsetResetPolicy) {
+ return this.modify(consumed -> consumed.withOffsetResetPolicy(offsetResetPolicy));
+ }
+
/**
* @see Consumed#withTimestampExtractor(TimestampExtractor)
*/
diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KStreamX.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KStreamX.java
index 86082a0f4..160cfc36e 100644
--- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KStreamX.java
+++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KStreamX.java
@@ -40,13 +40,10 @@
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Repartitioned;
import org.apache.kafka.streams.kstream.StreamJoined;
-import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
-import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
-import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
@@ -479,14 +476,6 @@ KErrorStreamX flatMapValuesCapturingErrors(
@Override
KStreamX peek(ForeachAction super K, ? super V> action, Named named);
- @Deprecated
- @Override
- KStreamX[] branch(Named named, Predicate super K, ? super V>... predicates);
-
- @Deprecated
- @Override
- KStreamX[] branch(Predicate super K, ? super V>... predicates);
-
@Override
BranchedKStreamX split();
@@ -499,14 +488,6 @@ KErrorStreamX flatMapValuesCapturingErrors(
@Override
KStreamX merge(KStream stream, Named named);
- @Deprecated
- @Override
- KStreamX through(String topic);
-
- @Deprecated
- @Override
- KStreamX through(String topic, Produced produced);
-
@Override
KStreamX repartition();
@@ -833,78 +814,6 @@ KStreamX leftJoin(GlobalKTable globalTable,
KeyValueMapper super K, ? super V, ? extends GK> keySelector,
ValueJoinerWithKey super K, ? super V, ? super GV, ? extends RV> valueJoiner, Named named);
- @Deprecated
- @Override
- KStreamX transform(
- TransformerSupplier super K, ? super V, KeyValue> transformerSupplier,
- String... stateStoreNames);
-
- @Deprecated
- @Override
- KStreamX transform(
- TransformerSupplier super K, ? super V, KeyValue> transformerSupplier,
- Named named, String... stateStoreNames);
-
- @Deprecated
- @Override
- KStreamX flatTransform(
- TransformerSupplier super K, ? super V, Iterable>> transformerSupplier,
- String... stateStoreNames);
-
- @Deprecated
- @Override
- KStreamX flatTransform(
- TransformerSupplier super K, ? super V, Iterable>> transformerSupplier, Named named,
- String... stateStoreNames);
-
- @Deprecated
- @Override
- KStreamX transformValues(
- ValueTransformerSupplier super V, ? extends VR> valueTransformerSupplier,
- String... stateStoreNames);
-
- @Deprecated
- @Override
- KStreamX transformValues(
- ValueTransformerSupplier super V, ? extends VR> valueTransformerSupplier,
- Named named, String... stateStoreNames);
-
- @Deprecated
- @Override
- KStreamX transformValues(
- ValueTransformerWithKeySupplier super K, ? super V, ? extends VR> valueTransformerSupplier,
- String... stateStoreNames);
-
- @Deprecated
- @Override
- KStreamX transformValues(
- ValueTransformerWithKeySupplier super K, ? super V, ? extends VR> valueTransformerSupplier, Named named,
- String... stateStoreNames);
-
- @Deprecated
- @Override
- KStreamX flatTransformValues(
- ValueTransformerSupplier super V, Iterable> valueTransformerSupplier,
- String... stateStoreNames);
-
- @Deprecated
- @Override
- KStreamX flatTransformValues(
- ValueTransformerSupplier super V, Iterable> valueTransformerSupplier,
- Named named, String... stateStoreNames);
-
- @Deprecated
- @Override
- KStreamX flatTransformValues(
- ValueTransformerWithKeySupplier super K, ? super V, Iterable> valueTransformerSupplier,
- String... stateStoreNames);
-
- @Deprecated
- @Override
- KStreamX flatTransformValues(
- ValueTransformerWithKeySupplier super K, ? super V, Iterable> valueTransformerSupplier, Named named,
- String... stateStoreNames);
-
@Override
KStreamX process(
ProcessorSupplier super K, ? super V, KOut, VOut> processorSupplier,
diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KStreamXImpl.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KStreamXImpl.java
index b815ff320..bf307c8a1 100644
--- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KStreamXImpl.java
+++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KStreamXImpl.java
@@ -24,7 +24,6 @@
package com.bakdata.kafka;
-import java.util.Arrays;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.NonNull;
@@ -46,13 +45,10 @@
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Repartitioned;
import org.apache.kafka.streams.kstream.StreamJoined;
-import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
-import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
-import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
@@ -369,20 +365,6 @@ public KStreamX peek(final ForeachAction super K, ? super V> action, fin
return this.context.wrap(this.wrapped.peek(action, named));
}
- @Override
- public KStreamX[] branch(final Predicate super K, ? super V>... predicates) {
- return Arrays.stream(this.wrapped.branch(predicates))
- .map(this.context::wrap)
- .toArray(KStreamX[]::new);
- }
-
- @Override
- public KStreamX[] branch(final Named named, final Predicate super K, ? super V>... predicates) {
- return Arrays.stream(this.wrapped.branch(named, predicates))
- .map(this.context::wrap)
- .toArray(KStreamX[]::new);
- }
-
@Override
public BranchedKStreamX split() {
return this.context.wrap(this.wrapped.split());
@@ -405,16 +387,6 @@ public KStreamX merge(final KStream stream, final Named named) {
return this.context.wrap(this.wrapped.merge(other, named));
}
- @Override
- public KStreamX through(final String topic) {
- return this.context.wrap(this.wrapped.through(topic));
- }
-
- @Override
- public KStreamX through(final String topic, final Produced produced) {
- return this.context.wrap(this.wrapped.through(topic, produced));
- }
-
@Override
public KStreamX repartition() {
return this.context.wrap(this.wrapped.repartition());
@@ -844,109 +816,6 @@ public KStreamX leftJoin(final GlobalKTable globalTa
return this.context.wrap(this.wrapped.leftJoin(globalTable, keySelector, valueJoiner, named));
}
- @Override
- public KStreamX transform(
- final TransformerSupplier super K, ? super V, KeyValue> transformerSupplier,
- final String... stateStoreNames) {
- return this.context.wrap(this.wrapped.transform(transformerSupplier, stateStoreNames));
- }
-
- @Override
- public KStreamX transform(
- final TransformerSupplier super K, ? super V, KeyValue> transformerSupplier, final Named named,
- final String... stateStoreNames) {
- return this.context.wrap(this.wrapped.transform(transformerSupplier, named, stateStoreNames));
- }
-
- @Override
- public KStreamX flatTransform(
- final TransformerSupplier super K, ? super V, Iterable>> transformerSupplier,
- final String... stateStoreNames) {
- return this.context.wrap(this.wrapped.flatTransform(transformerSupplier, stateStoreNames));
- }
-
- @Override
- public KStreamX flatTransform(
- final TransformerSupplier super K, ? super V, Iterable>> transformerSupplier,
- final Named named,
- final String... stateStoreNames) {
- return this.context.wrap(this.wrapped.flatTransform(transformerSupplier, named, stateStoreNames));
- }
-
- @Override
- public KStreamX transformValues(
- final ValueTransformerSupplier super V, ? extends VR> valueTransformerSupplier,
- final String... stateStoreNames) {
- return this.context.wrap(this.wrapped.transformValues(valueTransformerSupplier, stateStoreNames));
- }
-
- @Override
- public KStreamX transformValues(
- final ValueTransformerSupplier super V, ? extends VR> valueTransformerSupplier, final Named named,
- final String... stateStoreNames) {
- return this.context.wrap(this.wrapped.transformValues(valueTransformerSupplier, named, stateStoreNames));
- }
-
- @Override
- public KStreamX transformValues(
- final ValueTransformerWithKeySupplier super K, ? super V, ? extends VR> valueTransformerSupplier,
- final String... stateStoreNames) {
- return this.context.wrap(this.wrapped.transformValues(valueTransformerSupplier, stateStoreNames));
- }
-
- @Override
- public KStreamX transformValues(
- final ValueTransformerWithKeySupplier super K, ? super V, ? extends VR> valueTransformerSupplier,
- final Named named,
- final String... stateStoreNames) {
- return this.context.wrap(this.wrapped.transformValues(valueTransformerSupplier, named, stateStoreNames));
- }
-
- @Override
- public KStreamX flatTransformValues(
- final ValueTransformerSupplier super V, Iterable> valueTransformerSupplier,
- final String... stateStoreNames) {
- return this.context.wrap(this.wrapped.flatTransformValues(valueTransformerSupplier, stateStoreNames));
- }
-
- @Override
- public KStreamX flatTransformValues(
- final ValueTransformerSupplier super V, Iterable> valueTransformerSupplier, final Named named,
- final String... stateStoreNames) {
- return this.context.wrap(
- this.wrapped.flatTransformValues(valueTransformerSupplier, named, stateStoreNames));
- }
-
- @Override
- public KStreamX flatTransformValues(
- final ValueTransformerWithKeySupplier super K, ? super V, Iterable> valueTransformerSupplier,
- final String... stateStoreNames) {
- return this.context.wrap(this.wrapped.flatTransformValues(valueTransformerSupplier, stateStoreNames));
- }
-
- @Override
- public KStreamX flatTransformValues(
- final ValueTransformerWithKeySupplier super K, ? super V, Iterable> valueTransformerSupplier,
- final Named named,
- final String... stateStoreNames) {
- return this.context.wrap(
- this.wrapped.flatTransformValues(valueTransformerSupplier, named, stateStoreNames));
- }
-
- @Override
- public void process(
- final org.apache.kafka.streams.processor.ProcessorSupplier super K, ? super V> processorSupplier,
- final String... stateStoreNames) {
- this.wrapped.process(processorSupplier, stateStoreNames);
- }
-
- @Override
- public void process(
- final org.apache.kafka.streams.processor.ProcessorSupplier super K, ? super V> processorSupplier,
- final Named named, final String... stateStoreNames) {
- this.wrapped.process(processorSupplier, named, stateStoreNames);
- }
-
@Override
public KStreamX process(
final ProcessorSupplier super K, ? super V, KOut, VOut> processorSupplier,
diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KTableX.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KTableX.java
index ac6377d60..3cbe9ec06 100644
--- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KTableX.java
+++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/KTableX.java
@@ -24,6 +24,7 @@
package com.bakdata.kafka;
+import java.util.function.BiFunction;
import java.util.function.Function;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
@@ -313,11 +314,6 @@ KTableX outerJoin(KTable other,
KTableX join(KTable other, Function foreignKeyExtractor,
ValueJoiner joiner);
- @Deprecated
- @Override
- KTableX join(KTable other, Function foreignKeyExtractor,
- ValueJoiner joiner, Named named);
-
@Override
KTableX join(KTable other, Function foreignKeyExtractor,
ValueJoiner joiner, TableJoined tableJoined);
@@ -332,11 +328,6 @@ KTableX join(KTable other, Function foreignKe
KTableX join(KTable other, Function foreignKeyExtractor,
ValueJoiner joiner, MaterializedX> materialized);
- @Deprecated
- @Override
- KTableX join(KTable other, Function foreignKeyExtractor,
- ValueJoiner joiner, Named named, Materialized> materialized);
-
@Override
KTableX join(KTable other, Function foreignKeyExtractor,
ValueJoiner joiner, TableJoined tableJoined,
@@ -353,11 +344,6 @@ KTableX join(KTable other, Function foreignKe
KTableX leftJoin(KTable other, Function foreignKeyExtractor,
ValueJoiner joiner);
- @Deprecated
- @Override
- KTableX leftJoin(KTable other, Function foreignKeyExtractor,
- ValueJoiner joiner, Named named);
-
@Override
KTableX leftJoin(KTable other, Function foreignKeyExtractor,
ValueJoiner joiner, TableJoined tableJoined);
@@ -372,11 +358,6 @@ KTableX leftJoin(KTable other, Function forei
KTableX leftJoin(KTable other, Function foreignKeyExtractor,
ValueJoiner joiner, MaterializedX> materialized);
- @Deprecated
- @Override
- KTableX leftJoin(KTable other, Function foreignKeyExtractor,
- ValueJoiner joiner, Named named, Materialized> materialized);
-
@Override
KTableX leftJoin(KTable other, Function foreignKeyExtractor,
ValueJoiner joiner, TableJoined tableJoined,
@@ -388,4 +369,64 @@ KTableX leftJoin(KTable other, Function forei
KTableX leftJoin(KTable other, Function foreignKeyExtractor,
ValueJoiner joiner, TableJoined tableJoined,
MaterializedX> materialized);
+
+ @Override
+ KTableX join(KTable other, BiFunction foreignKeyExtractor,
+ ValueJoiner joiner);
+
+ @Override
+ KTableX join(KTable other, BiFunction foreignKeyExtractor,
+ ValueJoiner joiner, TableJoined tableJoined);
+
+ @Override
+ KTableX join(KTable other, BiFunction foreignKeyExtractor,
+ ValueJoiner joiner, Materialized> materialized);
+
+ /**
+ * @see #join(KTable, BiFunction, ValueJoiner, Materialized)
+ */
+ KTableX join(KTable other, BiFunction foreignKeyExtractor,
+ ValueJoiner joiner, MaterializedX> materialized);
+
+ @Override
+ KTableX join(KTable other, BiFunction foreignKeyExtractor,
+ ValueJoiner joiner, TableJoined tableJoined,
+ Materialized> materialized);
+
+ /**
+ * @see #join(KTable, BiFunction, ValueJoiner, TableJoined, Materialized)
+ */
+ KTableX join(KTable other, BiFunction foreignKeyExtractor,
+ ValueJoiner joiner, TableJoined tableJoined,
+ MaterializedX> materialized);
+
+ @Override
+ KTableX leftJoin(KTable other, BiFunction foreignKeyExtractor,
+ ValueJoiner joiner);
+
+ @Override
+ KTableX leftJoin(KTable other, BiFunction foreignKeyExtractor,
+ ValueJoiner joiner, TableJoined tableJoined);
+
+ @Override
+ KTableX leftJoin(KTable other, BiFunction foreignKeyExtractor,
+ ValueJoiner joiner, Materialized> materialized);
+
+ /**
+ * @see #leftJoin(KTable, BiFunction, ValueJoiner, Materialized)
+ */
+