Skip to content

Not able to run streaminglens in intellij idea #1

@mjose007

Description

@mjose007

I am running my code locally in intellij idea with sreaming lens maven dependancy .
I am getting below error . No output , let me know what i am doing wrong here


package com.manu.sstreaming;

import com.qubole.spark.streaminglens.StreamingLens;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

import org.apache.spark.sql.streaming.Trigger;
import scala.Predef;
import scala.collection.JavaConversions.*;
import scala.collection.JavaConverters;
import scala.collection.Seq;


/**
 * @author Manu Jose
 * create on : 16/04/20
 */
public class SStreamingNC {
    public static void main(String[] args) throws Exception {


        String host = "localhost";
        int port = 9999;
        //int port = Integer.parseInt(args[0]);


        SparkSession spark = SparkSession
                .builder()
                .appName("JavaStructuredNetworkWordCount")
                .master("local")
                .getOrCreate();


        Map<String, String> options = new HashMap<>();
        options.put("streamingLens.reporter.intervalMinutes", "1");

        scala.collection.immutable.Map<String, String> scalaMap = JavaConverters.mapAsScalaMapConverter(options).asScala().toMap(
                Predef.conforms());
        StreamingLens streamingLens = new StreamingLens(spark, scalaMap);
        streamingLens.registerListeners();




        // Create DataFrame representing the stream of input lines from connection to host:port
        spark.sql("SET spark.sql.streaming.metricsEnabled=true");
        Dataset<Row> lines = spark
                .readStream()
                .format("socket")
                .option("host", host)
                .option("port", port)
                .load();

        // Split the lines into words
        Dataset<String> words = lines.as(Encoders.STRING()).flatMap(
                (FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(),
                Encoders.STRING());

        // Generate running word count
        Dataset<Row> wordCounts = words.groupBy("value").count();


        // Start running the query that prints the running counts to the console
        StreamingQuery query = wordCounts.writeStream()
                .outputMode("update")
                .format("console")
                .queryName("Query_name")
                .trigger(Trigger.ProcessingTime(2 * 1000))
                .start();

       spark.streams().awaitAnyTermination();

    }

}
20/05/01 01:06:10 INFO StateStore: Getting StateStoreCoordinatorRef
20/05/01 01:06:10 INFO StateStore: Retrieved reference to StateStoreCoordinator: org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef@11ddc5d8
20/05/01 01:06:10 INFO StateStore: Env is not null
20/05/01 01:06:10 INFO StateStore: Getting StateStoreCoordinatorRef
20/05/01 01:06:10 INFO StateStore: Retrieved reference to StateStoreCoordinator: org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef@c1c5bf
20/05/01 01:06:10 INFO StateStore: Env is not null
20/05/01 01:06:10 INFO StateStore: Getting StateStoreCoordinatorRef
20/05/01 01:06:10 INFO StateStore: Retrieved reference to StateStoreCoordinator: org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef@62726145

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions