-
Notifications
You must be signed in to change notification settings - Fork 5
Open
Description
I am running my code locally in intellij idea with sreaming lens maven dependancy .
I am getting below error . No output , let me know what i am doing wrong here
package com.manu.sstreaming;
import com.qubole.spark.streaminglens.StreamingLens;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.apache.spark.sql.streaming.Trigger;
import scala.Predef;
import scala.collection.JavaConversions.*;
import scala.collection.JavaConverters;
import scala.collection.Seq;
/**
* @author Manu Jose
* create on : 16/04/20
*/
public class SStreamingNC {
public static void main(String[] args) throws Exception {
String host = "localhost";
int port = 9999;
//int port = Integer.parseInt(args[0]);
SparkSession spark = SparkSession
.builder()
.appName("JavaStructuredNetworkWordCount")
.master("local")
.getOrCreate();
Map<String, String> options = new HashMap<>();
options.put("streamingLens.reporter.intervalMinutes", "1");
scala.collection.immutable.Map<String, String> scalaMap = JavaConverters.mapAsScalaMapConverter(options).asScala().toMap(
Predef.conforms());
StreamingLens streamingLens = new StreamingLens(spark, scalaMap);
streamingLens.registerListeners();
// Create DataFrame representing the stream of input lines from connection to host:port
spark.sql("SET spark.sql.streaming.metricsEnabled=true");
Dataset<Row> lines = spark
.readStream()
.format("socket")
.option("host", host)
.option("port", port)
.load();
// Split the lines into words
Dataset<String> words = lines.as(Encoders.STRING()).flatMap(
(FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(),
Encoders.STRING());
// Generate running word count
Dataset<Row> wordCounts = words.groupBy("value").count();
// Start running the query that prints the running counts to the console
StreamingQuery query = wordCounts.writeStream()
.outputMode("update")
.format("console")
.queryName("Query_name")
.trigger(Trigger.ProcessingTime(2 * 1000))
.start();
spark.streams().awaitAnyTermination();
}
}
20/05/01 01:06:10 INFO StateStore: Getting StateStoreCoordinatorRef
20/05/01 01:06:10 INFO StateStore: Retrieved reference to StateStoreCoordinator: org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef@11ddc5d8
20/05/01 01:06:10 INFO StateStore: Env is not null
20/05/01 01:06:10 INFO StateStore: Getting StateStoreCoordinatorRef
20/05/01 01:06:10 INFO StateStore: Retrieved reference to StateStoreCoordinator: org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef@c1c5bf
20/05/01 01:06:10 INFO StateStore: Env is not null
20/05/01 01:06:10 INFO StateStore: Getting StateStoreCoordinatorRef
20/05/01 01:06:10 INFO StateStore: Retrieved reference to StateStoreCoordinator: org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef@62726145
Metadata
Metadata
Assignees
Labels
No labels