-
Notifications
You must be signed in to change notification settings - Fork 0
Part 2 : Spark using Scala
sudo apt update
java -version
sudo apt install default-jdk
sudo apt install scala
wget https://dlcdn.apache.org/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz
sudo tar -xvzf spark-3.0.3-bin-hadoop2.7.tgz
sudo mv spark-3.0.3-bin-hadoop2.7 /opt/spark
cd /opt/spark
export SPARK_HOME=/opt/spark
export PATH=$PATH:SPARK_HOME/bin:SPARK_HOME/sbin
echo $PATH
/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin:SPARK_HOME/bin:SPARK_HOME/sbin
ls
bin data jars LICENSE NOTICE R RELEASE yarn
conf examples kubernetes licenses python README.md sbin
./spark-shell
val rdd1 = sc.setFile("/home/nom_user/hello.txt")
rdd1.count
val list1 = List("apple","banana","orange")
val rdd2 = sc.parallelize(list1)
rdd2.collect
scala> val rdd4 = sc.textFile("/home/oussama/hello.txt")
rdd4: org.apache.spark.rdd.RDD[String] = /home/oussama/hello.txt MapPartitionsRDD[20] at textFile at <console>:23
scala> rdd4.collect res9: Array[String] = Array(Bonjour tout le monde Big Data, Formation Big Data Jour Jour, Jour 2)
scala> val count1 = rdd4.flatMap(line => line.split(" ")) count1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[21] at flatMap at :23
scala> count1.collect res10: Array[String] = Array(Bonjour, tout, le, monde, Big, Data, Formation, Big, Data, Jour, Jour, Jour, 2)
scala> val count2 = count1.map(word => (word,1)) count2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[22] at map at :23
scala> count2.collect res11: Array[(String, Int)] = Array((Bonjour,1), (tout,1), (le,1), (monde,1), (Big,1), (Data,1), (Formation,1), (Big,1), (Data,1), (Jour,1), (Jour,1), (Jour,1), (2,1))
scala> val count3 = count2.reduceByKey(+) count3: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[23] at reduceByKey at :23
scala> count3.collect res12: Array[(String, Int)] = Array((monde,1), (Big,2), (le,1), (2,1), (tout,1), (Bonjour,1), (Jour,3), (Formation,1), (Data,2))
scala> count3.saveAsTextFile("/home/oussama/resultat-spark")
/home/oussama/resultat-spark/
└── part-00000
scala> val rddmap = sc.parallelize(List("test1 test2", "test3 test4", "test5 test6")) rddmap: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[28] at parallelize at :23
scala> rddmap.map(x => x.split(" ")).collect res16: Array[Array[String]] = Array(Array(test1, test2), Array(test3, test4), Array(test5, test6))
scala> rddmap.flatMap(x => x.split(" ")).collect res17: Array[String] = Array(test1, test2, test3, test4, test5, test6)
scala> val rdd = sc.parallelize(List((1, "toto", "yoyo"),(2,"titi","jiji"),(3,"tata","nono"))) rdd: org.apache.spark.rdd.RDD[(Int, String, String)] = ParallelCollectionRDD[33] at parallelize at :23
scala> val dataframe = rdd.toDF("id","nom","prénom") dataframe: org.apache.spark.sql.DataFrame = [id: int, nom: string ... 1 more field]
scala> dataframe.createOrReplaceTempView("personnes")
scala> val dataframeSQL = spark.sql("select * from personnes") dataframeSQL: org.apache.spark.sql.DataFrame = [id: int, nom: string ... 1 more field]
scala> dataframeSQL.show() +---+----+------+ | id| nom|prénom| +---+----+------+ | 1|toto| yoyo| | 2|titi| jiji| | 3|tata| nono| +---+----+------+
scala> spark.sql("SELECT count(*) FROM personnes").show
+--------+
|count(1)|
+--------+
| 3|
+--------+