diff --git a/bin/run-wikipedia-async-application.sh b/bin/run-wikipedia-async-application.sh
new file mode 100755
index 00000000..43bd8374
--- /dev/null
+++ b/bin/run-wikipedia-async-application.sh
@@ -0,0 +1,30 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+home_dir=`pwd`
+base_dir=$(dirname $0)/..
+cd $base_dir
+base_dir=`pwd`
+cd $home_dir
+
+export EXECUTION_PLAN_DIR="$base_dir/plan"
+mkdir -p $EXECUTION_PLAN_DIR
+
+[[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$(dirname $0)/log4j-console.xml"
+
+exec $(dirname $0)/run-class.sh samza.examples.wikipedia.application.WikipediaAsyncApplication --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/wikipedia-async-application.properties
diff --git a/gradle.properties b/gradle.properties
index 283cc4a3..a4395dd6 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -18,7 +18,7 @@
*/
SAMZA_VERSION=1.2.0
-KAFKA_VERSION=0.11.0.2
+KAFKA_VERSION=2.1.1
HADOOP_VERSION=2.6.1
SLF4J_VERSION = 1.7.7
diff --git a/pom.xml b/pom.xml
index bef75f09..0cbe1053 100644
--- a/pom.xml
+++ b/pom.xml
@@ -139,7 +139,7 @@ under the License.
org.apache.kafka
kafka_2.11
- 0.11.0.2
+ 2.1.1
org.schwering
diff --git a/src/main/config/wikipedia-async-application.properties b/src/main/config/wikipedia-async-application.properties
new file mode 100644
index 00000000..203b29be
--- /dev/null
+++ b/src/main/config/wikipedia-async-application.properties
@@ -0,0 +1,58 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.name=wikipedia-async-application
+job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFactory
+job.default.system=kafka
+job.coordinator.zk.connect=localhost:2181
+
+# Task/Application
+task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerIdsFactory
+
+# Serializers
+serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
+serializers.registry.integer.class=org.apache.samza.serializers.IntegerSerdeFactory
+
+# Wikipedia System
+systems.wikipedia.samza.factory=samza.examples.wikipedia.system.WikipediaSystemFactory
+systems.wikipedia.host=irc.wikimedia.org
+systems.wikipedia.port=6667
+
+# Kafka System
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.consumer.zookeeper.connect=localhost:2181/
+systems.kafka.producer.bootstrap.servers=localhost:9092
+systems.kafka.default.stream.replication.factor=1
+
+# Streams
+streams.en-wikipedia.samza.system=wikipedia
+streams.en-wikipedia.samza.physical.name=#en.wikipedia
+
+streams.en-wiktionary.samza.system=wikipedia
+streams.en-wiktionary.samza.physical.name=#en.wiktionary
+
+streams.en-wikinews.samza.system=wikipedia
+streams.en-wikinews.samza.physical.name=#en.wikinews
+
+task.max.concurrency=20
+
+app.class=samza.examples.wikipedia.application.WikipediaAsyncApplication
+job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+job.container.count=1
+
+yarn.package.path=file:///Users/bkumaras/workspace-common/hello-samza/target/hello-samza-1.0.1-SNAPSHOT-dist.tar.gz
\ No newline at end of file
diff --git a/src/main/java/samza/examples/wikipedia/application/WikipediaAsyncApplication.java b/src/main/java/samza/examples/wikipedia/application/WikipediaAsyncApplication.java
new file mode 100644
index 00000000..27697588
--- /dev/null
+++ b/src/main/java/samza/examples/wikipedia/application/WikipediaAsyncApplication.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.wikipedia.application;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import joptsimple.OptionSet;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.functions.AsyncFlatMapFunction;
+import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
+import org.apache.samza.util.CommandLine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import samza.examples.wikipedia.system.WikipediaFeed.WikipediaFeedEvent;
+import samza.examples.wikipedia.system.descriptors.WikipediaInputDescriptor;
+import samza.examples.wikipedia.system.descriptors.WikipediaSystemDescriptor;
+
+
+/**
+ * This {@link StreamApplication} demonstrates the Samza fluent API by performing the same operations as
+ * {@link samza.examples.wikipedia.task.WikipediaFeedStreamTask},
+ * {@link samza.examples.wikipedia.task.WikipediaParserStreamTask}, and
+ * {@link samza.examples.wikipedia.task.WikipediaStatsStreamTask} in one expression.
+ *
+ * The only functional difference is the lack of "wikipedia-raw" and "wikipedia-edits"
+ * streams to connect the operators, as they are not needed with the fluent API.
+ *
+ * The application processes Wikipedia events in the following steps:
+ *
+ * - Merge wikipedia, wiktionary, and wikinews events into one stream
+ * - Parse each event to a more structured format
+ * - Aggregate some stats over a 10s window
+ * - Format each window output for public consumption
+ * - Send the window output to Kafka
+ *
+ *
+ */
+public class WikipediaAsyncApplication implements StreamApplication {
+ private static final Logger log = LoggerFactory.getLogger(WikipediaAsyncApplication.class);
+
+ private static final List KAFKA_CONSUMER_ZK_CONNECT = ImmutableList.of("localhost:2181");
+ private static final List KAFKA_PRODUCER_BOOTSTRAP_SERVERS = ImmutableList.of("localhost:9092");
+ private static final Map KAFKA_DEFAULT_STREAM_CONFIGS = ImmutableMap.of("replication.factor", "1");
+
+ public static final String WIKIPEDIA_CHANNEL = "#en.wikipedia";
+ public static final String WIKINEWS_CHANNEL = "#en.wikinews";
+ public static final String WIKTIONARY_CHANNEL = "#en.wiktionary";
+
+ @Override
+ @SuppressWarnings("Duplicates")
+ public void describe(StreamApplicationDescriptor appDescriptor) {
+ // Define a SystemDescriptor for Wikipedia data
+ WikipediaSystemDescriptor wikipediaSystemDescriptor = new WikipediaSystemDescriptor("irc.wikimedia.org", 6667);
+
+ // Define InputDescriptors for consuming wikipedia data
+ WikipediaInputDescriptor wikipediaInputDescriptor = wikipediaSystemDescriptor
+ .getInputDescriptor("en-wikipedia")
+ .withChannel(WIKIPEDIA_CHANNEL);
+ WikipediaInputDescriptor wiktionaryInputDescriptor = wikipediaSystemDescriptor
+ .getInputDescriptor("en-wiktionary")
+ .withChannel(WIKTIONARY_CHANNEL);
+ WikipediaInputDescriptor wikiNewsInputDescriptor = wikipediaSystemDescriptor
+ .getInputDescriptor("en-wikinews")
+ .withChannel(WIKINEWS_CHANNEL);
+
+ // Define a system descriptor for Kafka
+ KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor("kafka")
+ .withConsumerZkConnect(KAFKA_CONSUMER_ZK_CONNECT)
+ .withProducerBootstrapServers(KAFKA_PRODUCER_BOOTSTRAP_SERVERS)
+ .withDefaultStreamConfigs(KAFKA_DEFAULT_STREAM_CONFIGS);
+
+ // Define an output descriptor
+ KafkaOutputDescriptor statsOutputDescriptor =
+ kafkaSystemDescriptor.getOutputDescriptor("wikipedia-stats-3", new JsonSerdeV2<>(
+ WikipediaFeedEvent.class));
+
+ // Messages come from WikipediaConsumer so we know the type is WikipediaFeedEvent
+ MessageStream wikipediaEvents = appDescriptor.getInputStream(wikipediaInputDescriptor);
+ MessageStream wiktionaryEvents = appDescriptor.getInputStream(wiktionaryInputDescriptor);
+ MessageStream wikiNewsEvents = appDescriptor.getInputStream(wikiNewsInputDescriptor);
+
+ // Output (also un-keyed)
+ OutputStream wikipediaStats =
+ appDescriptor.getOutputStream(statsOutputDescriptor);
+
+ // Merge inputs
+ MessageStream allWikipediaEvents =
+ MessageStream.mergeAll(ImmutableList.of(wikipediaEvents, wiktionaryEvents, wikiNewsEvents));
+
+ // Parse, update stats, prepare output, and send
+ allWikipediaEvents
+ .flatMapAsync(new MyAsyncFlatMapFunction())
+ .sendTo(wikipediaStats);
+ }
+
+ /**
+ * A sample async map function to mimic asynchronous behavior in the pipeline.
+ * In a real world example this would be replaced with remote IO.
+ */
+ static class MyAsyncFlatMapFunction implements AsyncFlatMapFunction {
+ @Override
+ public CompletionStage> apply(WikipediaFeedEvent wikipediaFeedEvent) {
+ return CompletableFuture.supplyAsync(() -> {
+ log.debug("Executing filter function for {}", wikipediaFeedEvent.getChannel());
+ boolean res;
+ try {
+ Thread.sleep((long) (Math.random() * 10000));
+ res = Math.random() > 0.5;
+ } catch(InterruptedException ec) {
+ res = false;
+ }
+
+ log.debug("Finished executing filter function for {} with result {}.", wikipediaFeedEvent.getChannel(), res);
+
+ return res ? Collections.singleton(wikipediaFeedEvent) : Collections.emptyList();
+ });
+ }
+ }
+
+
+ /**
+ * Executes the application using the local application runner.
+ * It takes two required command line arguments
+ * config-factory: a fully {@link org.apache.samza.config.factories.PropertiesConfigFactory} class name
+ * config-path: path to application properties
+ *
+ * @param args command line arguments
+ */
+ public static void main(String[] args) {
+ CommandLine cmdLine = new CommandLine();
+ OptionSet options = cmdLine.parser().parse(args);
+ Config config = cmdLine.loadConfig(options);
+
+ WikipediaAsyncApplication app = new WikipediaAsyncApplication();
+ LocalApplicationRunner runner = new LocalApplicationRunner(app, config);
+ runner.run();
+ runner.waitForFinish();
+ }
+}
+