Skip to content

Commit 2cf9e06

Browse files
hlcianfagnaamotl
andauthored
Spark/Scala: Load Spark data frame into CrateDB using Scala and HTTP (#520)
* New example with Scala Spark and the CrateDB http endpoint * Typo * Update by-language/scala-spark-http/README.md Co-authored-by: Andreas Motl <andreas.motl@crate.io> * Update by-language/scala-spark-http/README.md Co-authored-by: Andreas Motl <andreas.motl@crate.io> * Update by-language/scala-spark-http/README.md Co-authored-by: Andreas Motl <andreas.motl@crate.io> * Update by-language/scala-spark-http/README.md Co-authored-by: Andreas Motl <andreas.motl@crate.io> * Moving example to by-dataframe folder as suggested by amotl --------- Co-authored-by: Andreas Motl <andreas.motl@crate.io>
1 parent ca65cea commit 2cf9e06

File tree

3 files changed

+106
-0
lines changed

3 files changed

+106
-0
lines changed
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# Load Spark data frame into CrateDB using Scala and HTTP
2+
3+
This example aims to demonstrate how a Spark data frame can be loaded into CrateDB using the CrateDB HTTP endpoint.
4+
5+
It assumes there is a CrateDB instance running on localhost accepting connections with the default `crate` superuser, and it relies on the following table being created:
6+
7+
.. code-block:: sql
8+
9+
CREATE TABLE myschema.mytable (
10+
examplefield1 TIMESTAMP,
11+
anotherfield geo_point,
12+
jsonpayload OBJECT
13+
);
14+
15+
When applicable for your environment, you may want to consider to replace `scalaj.http` with async calls like `akka.http` or `AsyncHttpClient`.
16+
You may also want to explore if connection pooling is useful in your environment,
17+
and if JDBC calls leveraging the PostgreSQL wire protocol are more convenient
18+
for your particular case.
19+
20+
Saying this, note that this example uses [CrateDB's HTTP bulk operations] to ingest
21+
data, which is currently the most efficient way to do it.
22+
23+
[CrateDB's HTTP bulk operations]: https://cratedb.com/docs/guide/performance/inserts/bulk.html
24+
25+
You can run this example with [sbt]:
26+
27+
.. code-block:: shell
28+
29+
sbt run
30+
31+
[sbt]: https://www.scala-sbt.org/download/
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
import scalaj.http.{Http, HttpOptions}
2+
import org.apache.spark.sql.{SparkSession, Row}
3+
import org.apache.spark.sql.types._
4+
import org.json4s.jackson.Serialization
5+
import java.time.LocalDateTime
6+
import java.time.format.DateTimeFormatter
7+
import java.sql.Timestamp
8+
9+
object SparkCrateDBhttpExample {
10+
def main(args: Array[String]): Unit = {
11+
val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm")
12+
13+
val data = Seq(
14+
Row(Timestamp.valueOf(LocalDateTime.parse("2024-07-10 10:00", formatter)), Array(9.744417, 47.413417), """{"example_quantity_field": 30, "example_string_field": "abc"}"""),
15+
Row(Timestamp.valueOf(LocalDateTime.parse("2024-07-10 11:00", formatter)), Array(13.46738, 52.50463), """{"example_quantity_field": 40, "example_string_field": "def"}""")
16+
)
17+
18+
val spark = SparkSession.builder
19+
.appName("test")
20+
.master("local[*]")
21+
.getOrCreate()
22+
val df = spark.createDataFrame(
23+
spark.sparkContext.parallelize(data),
24+
StructType(
25+
List(
26+
StructField("examplefield1", TimestampType, true),
27+
StructField("anotherfield", ArrayType(DoubleType), true),
28+
StructField("jsonpayload", StringType, true)
29+
)
30+
)
31+
)
32+
33+
val url = "http://localhost:4200/_sql"
34+
35+
val columns = df.columns.mkString(", ")
36+
val placeholders = df.columns.map(_ => "?").mkString(", ")
37+
val stmt = s"INSERT INTO myschema.mytable ($columns) VALUES ($placeholders)"
38+
39+
val columnNames = df.columns
40+
df.foreachPartition { partition =>
41+
val bulkArgs: List[List[Any]] = partition.map { row =>
42+
columnNames.indices.map(i => row.get(i)).toList
43+
}.toList
44+
45+
if (bulkArgs.nonEmpty) {
46+
val data = Map(
47+
"stmt" -> stmt,
48+
"bulk_args" -> bulkArgs
49+
)
50+
51+
implicit val formats = org.json4s.DefaultFormats
52+
val jsonString = Serialization.write(data)
53+
54+
val response = Http(url)
55+
.postData(jsonString)
56+
.header("Content-Type", "application/json")
57+
.asString
58+
59+
println(response)
60+
}
61+
}
62+
}
63+
}
64+
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
name := "SparkCrateDBhttpExample"
2+
3+
version := "0.1"
4+
5+
scalaVersion := "2.11.12"
6+
7+
libraryDependencies ++= Seq(
8+
"org.scala-lang.modules" %% "scala-xml" % "1.0.6",
9+
"org.scalaj" %% "scalaj-http" % "2.4.2",
10+
"org.apache.spark" %% "spark-sql" % "2.4.8"
11+
)

0 commit comments

Comments
 (0)