diff --git a/bin/run-wikipedia-async-application.sh b/bin/run-wikipedia-async-application.sh new file mode 100755 index 00000000..43bd8374 --- /dev/null +++ b/bin/run-wikipedia-async-application.sh @@ -0,0 +1,30 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +home_dir=`pwd` +base_dir=$(dirname $0)/.. +cd $base_dir +base_dir=`pwd` +cd $home_dir + +export EXECUTION_PLAN_DIR="$base_dir/plan" +mkdir -p $EXECUTION_PLAN_DIR + +[[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$(dirname $0)/log4j-console.xml" + +exec $(dirname $0)/run-class.sh samza.examples.wikipedia.application.WikipediaAsyncApplication --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/wikipedia-async-application.properties diff --git a/gradle.properties b/gradle.properties index 283cc4a3..a4395dd6 100644 --- a/gradle.properties +++ b/gradle.properties @@ -18,7 +18,7 @@ */ SAMZA_VERSION=1.2.0 -KAFKA_VERSION=0.11.0.2 +KAFKA_VERSION=2.1.1 HADOOP_VERSION=2.6.1 SLF4J_VERSION = 1.7.7 diff --git a/pom.xml b/pom.xml index bef75f09..0cbe1053 100644 --- a/pom.xml +++ b/pom.xml @@ -139,7 +139,7 @@ under the License. org.apache.kafka kafka_2.11 - 0.11.0.2 + 2.1.1 org.schwering diff --git a/src/main/config/wikipedia-async-application.properties b/src/main/config/wikipedia-async-application.properties new file mode 100644 index 00000000..203b29be --- /dev/null +++ b/src/main/config/wikipedia-async-application.properties @@ -0,0 +1,58 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Job +job.name=wikipedia-async-application +job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFactory +job.default.system=kafka +job.coordinator.zk.connect=localhost:2181 + +# Task/Application +task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerIdsFactory + +# Serializers +serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory +serializers.registry.integer.class=org.apache.samza.serializers.IntegerSerdeFactory + +# Wikipedia System +systems.wikipedia.samza.factory=samza.examples.wikipedia.system.WikipediaSystemFactory +systems.wikipedia.host=irc.wikimedia.org +systems.wikipedia.port=6667 + +# Kafka System +systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory +systems.kafka.consumer.zookeeper.connect=localhost:2181/ +systems.kafka.producer.bootstrap.servers=localhost:9092 +systems.kafka.default.stream.replication.factor=1 + +# Streams +streams.en-wikipedia.samza.system=wikipedia +streams.en-wikipedia.samza.physical.name=#en.wikipedia + +streams.en-wiktionary.samza.system=wikipedia +streams.en-wiktionary.samza.physical.name=#en.wiktionary + +streams.en-wikinews.samza.system=wikipedia +streams.en-wikinews.samza.physical.name=#en.wikinews + +task.max.concurrency=20 + +app.class=samza.examples.wikipedia.application.WikipediaAsyncApplication +job.factory.class=org.apache.samza.job.yarn.YarnJobFactory +job.container.count=1 + +yarn.package.path=file:///Users/bkumaras/workspace-common/hello-samza/target/hello-samza-1.0.1-SNAPSHOT-dist.tar.gz \ No newline at end of file diff --git a/src/main/java/samza/examples/wikipedia/application/WikipediaAsyncApplication.java b/src/main/java/samza/examples/wikipedia/application/WikipediaAsyncApplication.java new file mode 100644 index 00000000..27697588 --- /dev/null +++ b/src/main/java/samza/examples/wikipedia/application/WikipediaAsyncApplication.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package samza.examples.wikipedia.application; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import joptsimple.OptionSet; +import org.apache.samza.application.StreamApplication; +import org.apache.samza.application.descriptors.StreamApplicationDescriptor; +import org.apache.samza.config.Config; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.OutputStream; +import org.apache.samza.operators.functions.AsyncFlatMapFunction; +import org.apache.samza.runtime.LocalApplicationRunner; +import org.apache.samza.serializers.JsonSerdeV2; +import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor; +import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor; +import org.apache.samza.util.CommandLine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import samza.examples.wikipedia.system.WikipediaFeed.WikipediaFeedEvent; +import samza.examples.wikipedia.system.descriptors.WikipediaInputDescriptor; +import samza.examples.wikipedia.system.descriptors.WikipediaSystemDescriptor; + + +/** + * This {@link StreamApplication} demonstrates the Samza fluent API by performing the same operations as + * {@link samza.examples.wikipedia.task.WikipediaFeedStreamTask}, + * {@link samza.examples.wikipedia.task.WikipediaParserStreamTask}, and + * {@link samza.examples.wikipedia.task.WikipediaStatsStreamTask} in one expression. + * + * The only functional difference is the lack of "wikipedia-raw" and "wikipedia-edits" + * streams to connect the operators, as they are not needed with the fluent API. + * + * The application processes Wikipedia events in the following steps: + * + * + */ +public class WikipediaAsyncApplication implements StreamApplication { + private static final Logger log = LoggerFactory.getLogger(WikipediaAsyncApplication.class); + + private static final List KAFKA_CONSUMER_ZK_CONNECT = ImmutableList.of("localhost:2181"); + private static final List KAFKA_PRODUCER_BOOTSTRAP_SERVERS = ImmutableList.of("localhost:9092"); + private static final Map KAFKA_DEFAULT_STREAM_CONFIGS = ImmutableMap.of("replication.factor", "1"); + + public static final String WIKIPEDIA_CHANNEL = "#en.wikipedia"; + public static final String WIKINEWS_CHANNEL = "#en.wikinews"; + public static final String WIKTIONARY_CHANNEL = "#en.wiktionary"; + + @Override + @SuppressWarnings("Duplicates") + public void describe(StreamApplicationDescriptor appDescriptor) { + // Define a SystemDescriptor for Wikipedia data + WikipediaSystemDescriptor wikipediaSystemDescriptor = new WikipediaSystemDescriptor("irc.wikimedia.org", 6667); + + // Define InputDescriptors for consuming wikipedia data + WikipediaInputDescriptor wikipediaInputDescriptor = wikipediaSystemDescriptor + .getInputDescriptor("en-wikipedia") + .withChannel(WIKIPEDIA_CHANNEL); + WikipediaInputDescriptor wiktionaryInputDescriptor = wikipediaSystemDescriptor + .getInputDescriptor("en-wiktionary") + .withChannel(WIKTIONARY_CHANNEL); + WikipediaInputDescriptor wikiNewsInputDescriptor = wikipediaSystemDescriptor + .getInputDescriptor("en-wikinews") + .withChannel(WIKINEWS_CHANNEL); + + // Define a system descriptor for Kafka + KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor("kafka") + .withConsumerZkConnect(KAFKA_CONSUMER_ZK_CONNECT) + .withProducerBootstrapServers(KAFKA_PRODUCER_BOOTSTRAP_SERVERS) + .withDefaultStreamConfigs(KAFKA_DEFAULT_STREAM_CONFIGS); + + // Define an output descriptor + KafkaOutputDescriptor statsOutputDescriptor = + kafkaSystemDescriptor.getOutputDescriptor("wikipedia-stats-3", new JsonSerdeV2<>( + WikipediaFeedEvent.class)); + + // Messages come from WikipediaConsumer so we know the type is WikipediaFeedEvent + MessageStream wikipediaEvents = appDescriptor.getInputStream(wikipediaInputDescriptor); + MessageStream wiktionaryEvents = appDescriptor.getInputStream(wiktionaryInputDescriptor); + MessageStream wikiNewsEvents = appDescriptor.getInputStream(wikiNewsInputDescriptor); + + // Output (also un-keyed) + OutputStream wikipediaStats = + appDescriptor.getOutputStream(statsOutputDescriptor); + + // Merge inputs + MessageStream allWikipediaEvents = + MessageStream.mergeAll(ImmutableList.of(wikipediaEvents, wiktionaryEvents, wikiNewsEvents)); + + // Parse, update stats, prepare output, and send + allWikipediaEvents + .flatMapAsync(new MyAsyncFlatMapFunction()) + .sendTo(wikipediaStats); + } + + /** + * A sample async map function to mimic asynchronous behavior in the pipeline. + * In a real world example this would be replaced with remote IO. + */ + static class MyAsyncFlatMapFunction implements AsyncFlatMapFunction { + @Override + public CompletionStage> apply(WikipediaFeedEvent wikipediaFeedEvent) { + return CompletableFuture.supplyAsync(() -> { + log.debug("Executing filter function for {}", wikipediaFeedEvent.getChannel()); + boolean res; + try { + Thread.sleep((long) (Math.random() * 10000)); + res = Math.random() > 0.5; + } catch(InterruptedException ec) { + res = false; + } + + log.debug("Finished executing filter function for {} with result {}.", wikipediaFeedEvent.getChannel(), res); + + return res ? Collections.singleton(wikipediaFeedEvent) : Collections.emptyList(); + }); + } + } + + + /** + * Executes the application using the local application runner. + * It takes two required command line arguments + * config-factory: a fully {@link org.apache.samza.config.factories.PropertiesConfigFactory} class name + * config-path: path to application properties + * + * @param args command line arguments + */ + public static void main(String[] args) { + CommandLine cmdLine = new CommandLine(); + OptionSet options = cmdLine.parser().parse(args); + Config config = cmdLine.loadConfig(options); + + WikipediaAsyncApplication app = new WikipediaAsyncApplication(); + LocalApplicationRunner runner = new LocalApplicationRunner(app, config); + runner.run(); + runner.waitForFinish(); + } +} +