diff --git a/README.md b/README.md index 09370b8cd4..cdb1a0170f 100644 --- a/README.md +++ b/README.md @@ -11,14 +11,16 @@ Kotlin DataFrame aims to reconcile Kotlin's static typing with the dynamic nature of data by utilizing both the full power of the Kotlin language and the opportunities provided by intermittent code execution in Jupyter notebooks and REPL. * **Hierarchical** — represents hierarchical data structures, such as JSON or a tree of JVM objects. -* **Functional** — data processing pipeline is organized in a chain of `DataFrame` transformation operations. Every operation returns a new instance of `DataFrame` reusing underlying storage wherever it's possible. +* **Functional** — the data processing pipeline is organized in a chain of `DataFrame` transformation operations. +* **Immutable** — every operation returns a new instance of `DataFrame` reusing underlying storage wherever it's possible. * **Readable** — data transformation operations are defined in DSL close to natural language. * **Practical** — provides simple solutions for common problems and the ability to perform complex tasks. * **Minimalistic** — simple, yet powerful data model of three column kinds. -* **Interoperable** — convertable with Kotlin data classes and collections. +* **Interoperable** — convertable with Kotlin data classes and collections. This also means conversion to/from other libraries' data structures is usually quite straightforward! * **Generic** — can store objects of any type, not only numbers or strings. * **Typesafe** — on-the-fly generation of extension properties for type safe data access with Kotlin-style care for null safety. * **Polymorphic** — type compatibility derives from column schema compatibility. You can define a function that requires a special subset of columns in a dataframe but doesn't care about other columns. + In notebooks this works out-of-the-box. In ordinary projects this requires casting (for now). Integrates with [Kotlin kernel for Jupyter](https://github.com/Kotlin/kotlin-jupyter). Inspired by [krangl](https://github.com/holgerbrandl/krangl), Kotlin Collections and [pandas](https://pandas.pydata.org/) diff --git a/build.gradle.kts b/build.gradle.kts index d43e57ed4a..1220748e6b 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -196,7 +196,7 @@ allprojects { logger.warn("Could not set ktlint config on :${this.name}") } - // set the java toolchain version to 11 for all subprojects for CI stability + // set the java toolchain version to 21 for all subprojects for CI stability extensions.findByType()?.jvmToolchain(21) // Attempts to configure buildConfig for each sub-project that uses it diff --git a/docs/StardustDocs/topics/guides/Guides-And-Examples.md b/docs/StardustDocs/topics/guides/Guides-And-Examples.md index 49dfed9434..62566338c1 100644 --- a/docs/StardustDocs/topics/guides/Guides-And-Examples.md +++ b/docs/StardustDocs/topics/guides/Guides-And-Examples.md @@ -46,8 +46,17 @@ Explore our structured, in-depth guides to steadily improve your Kotlin DataFram — explore the GeoDataFrame module that brings a convenient Kotlin DataFrame API to geospatial workflows, enhanced with beautiful Kandy-Geo visualizations (*experimental*). + + +* [Using Unsupported Data Sources](https://github.com/Kotlin/dataframe/tree/master/examples/idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples): + — A guide by examples. While these might one day become proper integrations of DataFrame, for now, + we provide them as examples for how to make such integrations yourself. + * [Apache Spark Interop](https://github.com/Kotlin/dataframe/tree/master/examples/idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples/spark) + * [Apache Spark Interop (With Kotlin Spark API)](https://github.com/Kotlin/dataframe/tree/master/examples/idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples/kotlinSpark) + * [Multik Interop](https://github.com/Kotlin/dataframe/tree/master/examples/idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples/multik) + * [JetBrains Exposed Interop](https://github.com/Kotlin/dataframe/tree/master/examples/idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples/exposed) * [OpenAPI Guide](https://github.com/Kotlin/dataframe/blob/master/examples/notebooks/json/KeyValueAndOpenApi.ipynb) — learn how to parse and explore [OpenAPI](https://swagger.io) JSON structures using Kotlin DataFrame, enabling structured access and intuitive analysis of complex API schemas (*experimental*, supports OpenAPI 3.0.0). diff --git a/docs/StardustDocs/topics/overview.md b/docs/StardustDocs/topics/overview.md index 6d00883f77..d5d1cb4b89 100644 --- a/docs/StardustDocs/topics/overview.md +++ b/docs/StardustDocs/topics/overview.md @@ -36,30 +36,33 @@ The goal of data wrangling is to assure quality and useful data. ## Main Features and Concepts -* [**Hierarchical**](hierarchical.md) — the Kotlin DataFrame library provides an ability to read and present data from different sources including not only plain **CSV** but also **JSON** or **[SQL databases](readSqlDatabases.md)**. -That’s why it has been designed hierarchical and allows nesting of columns and cells. - -* [**Interoperable**](collectionsInterop.md) — hierarchical data layout also opens a possibility of converting any objects -structure in application memory to a data frame and vice versa. - -* **Safe** — the Kotlin DataFrame library provides a mechanism of on-the-fly [**generation of extension properties**](extensionPropertiesApi.md) +* [**Hierarchical**](hierarchical.md) — the Kotlin DataFrame library provides an ability to read and present data from different sources, +including not only plain **CSV** but also **JSON** or **[SQL databases](readSqlDatabases.md)**. +This is why it was designed to be hierarchical and allows nesting of columns and cells. +* **Functional** — the data processing pipeline is organized in a chain of [`DataFrame`](DataFrame.md) transformation operations. +* **Immutable** — every operation returns a new instance of [`DataFrame`](DataFrame.md) reusing underlying storage wherever it's possible. +* **Readable** — data transformation operations are defined in DSL close to natural language. +* **Practical** — provides simple solutions for common problems and the ability to perform complex tasks. +* **Minimalistic** — simple, yet powerful data model of three [column kinds](DataColumn.md#column-kinds). +* [**Interoperable**](collectionsInterop.md) — convertable with Kotlin data classes and collections. + This also means conversion to/from other libraries' data structures is usually quite straightforward! + See our [examples](https://github.com/Kotlin/dataframe/tree/master/examples/idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples) + for some conversions between DataFrame and [Apache Spark](https://github.com/Kotlin/dataframe/tree/master/examples/idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples/spark), [Multik](https://github.com/Kotlin/dataframe/tree/master/examples/idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples/multik), and [JetBrains Exposed](https://github.com/Kotlin/dataframe/tree/master/examples/idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples/exposed). +* **Generic** — can store objects of any type, not only numbers or strings. +* **Typesafe** — the Kotlin DataFrame library provides a mechanism of on-the-fly [**generation of extension properties**](extensionPropertiesApi.md) that correspond to the columns of a data frame. In interactive notebooks like Jupyter or Datalore, the generation runs after each cell execution. In IntelliJ IDEA there's a Gradle plugin for generation properties based on CSV file or JSON file. Also, we’re working on a compiler plugin that infers and transforms [`DataFrame`](DataFrame.md) schema while typing. You can now clone this [project with many examples](https://github.com/koperagen/df-plugin-demo) showcasing how it allows you to reliably use our most convenient extension properties API. The generated properties ensure you’ll never misspell column name and don’t mess up with its type, and of course nullability is also preserved. - -* **Generic** — columns can store objects of any type, not only numbers or strings. - * [**Polymorphic**](schemas.md) — - if all columns of [`DataFrame`](DataFrame.md) are presented in some other dataframes, - then the first one could be a superclass for latter. -Thus, - one can define a function on an interface with some set of columns - and then execute it in a safe way on any [`DataFrame`](DataFrame.md) which contains this set of columns. - -* **Immutable** — all operations on [`DataFrame`](DataFrame.md) produce new instance, while underlying data is reused wherever it's possible + if all columns of a [`DataFrame`](DataFrame.md) instance are presented in another dataframe, + then the first one will be seen as a superclass for the latter. +This means you can define a function on an interface with some set of columns + and then execute it safely on any [`DataFrame`](DataFrame.md) which contains this same set of columns. + In notebooks, this works out-of-the-box. + In ordinary projects, this requires casting (for now). ## Syntax diff --git a/docs/StardustDocs/topics/schemasInheritance.md b/docs/StardustDocs/topics/schemasInheritance.md index 3cb37b4422..f6b888dd7a 100644 --- a/docs/StardustDocs/topics/schemasInheritance.md +++ b/docs/StardustDocs/topics/schemasInheritance.md @@ -18,7 +18,7 @@ New schema interface for `filtered` variable will be derived from previously gen interface DataFrameType1 : DataFrameType ``` -Extension properties for data access are generated only for new and overriden members of `DataFrameType1` interface: +Extension properties for data access are generated only for new and overridden members of `DataFrameType1` interface: ```kotlin val ColumnsContainer.age: DataColumn get() = this["age"] as DataColumn diff --git a/examples/README.md b/examples/README.md index d851246bcb..a76febde8d 100644 --- a/examples/README.md +++ b/examples/README.md @@ -9,6 +9,18 @@ * [json](idea-examples/json) Using OpenAPI support in DataFrame's Gradle and KSP plugins to access data from [API guru](https://apis.guru/) in a type-safe manner * [imdb sql database](https://github.com/zaleslaw/KotlinDataFrame-SQL-Examples) This project prominently showcases how to convert data from an SQL table to a Kotlin DataFrame and how to transform the result of an SQL query into a DataFrame. +* [unsupported-data-sources](idea-examples/unsupported-data-sources) Showcases of how to use DataFrame with + (momentarily) unsupported data libraries such as [Spark](https://spark.apache.org/) and [Exposed](https://github.com/JetBrains/Exposed). +They show how to convert to and from Kotlin Dataframe and their respective tables. + * **JetBrains Exposed**: See the [exposed folder](./idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples/exposed) + for an example of using Kotlin Dataframe with [Exposed](https://github.com/JetBrains/Exposed). + * **Apache Spark**: See the [spark folder](./idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples/spark) + for an example of using Kotlin Dataframe with [Spark](https://spark.apache.org/). + * **Spark (with Kotlin Spark API)**: See the [kotlinSpark folder](./idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples/kotlinSpark) + for an example of using Kotlin DataFrame with the [Kotlin Spark API](https://github.com/JetBrains/kotlin-spark-api). + * **Multik**: See the [multik folder](./idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples/multik) + for an example of using Kotlin Dataframe with [Multik](https://github.com/Kotlin/multik). + ### Notebook examples diff --git a/examples/idea-examples/unsupported-data-sources/build.gradle.kts b/examples/idea-examples/unsupported-data-sources/build.gradle.kts new file mode 100644 index 0000000000..c6560cdae9 --- /dev/null +++ b/examples/idea-examples/unsupported-data-sources/build.gradle.kts @@ -0,0 +1,73 @@ +plugins { + application + kotlin("jvm") + + id("org.jetbrains.kotlinx.dataframe") + + // only mandatory if `kotlin.dataframe.add.ksp=false` in gradle.properties + id("com.google.devtools.ksp") +} + +repositories { + mavenLocal() // in case of local dataframe development + mavenCentral() +} + +dependencies { + // implementation("org.jetbrains.kotlinx:dataframe:X.Y.Z") + implementation(project(":")) + + // exposed + sqlite database support + implementation(libs.sqlite) + implementation(libs.exposed.core) + implementation(libs.exposed.kotlin.datetime) + implementation(libs.exposed.jdbc) + implementation(libs.exposed.json) + implementation(libs.exposed.money) + + // (kotlin) spark support + implementation(libs.kotlin.spark) + compileOnly(libs.spark) + implementation(libs.log4j.core) + implementation(libs.log4j.api) + + // multik support + implementation(libs.multik.core) + implementation(libs.multik.default) +} + +/** + * Runs the kotlinSpark/typedDataset example with java 11. + */ +val runKotlinSparkTypedDataset by tasks.registering(JavaExec::class) { + classpath = sourceSets["main"].runtimeClasspath + javaLauncher = javaToolchains.launcherFor { languageVersion = JavaLanguageVersion.of(11) } + mainClass = "org.jetbrains.kotlinx.dataframe.examples.kotlinSpark.TypedDatasetKt" +} + +/** + * Runs the kotlinSpark/untypedDataset example with java 11. + */ +val runKotlinSparkUntypedDataset by tasks.registering(JavaExec::class) { + classpath = sourceSets["main"].runtimeClasspath + javaLauncher = javaToolchains.launcherFor { languageVersion = JavaLanguageVersion.of(11) } + mainClass = "org.jetbrains.kotlinx.dataframe.examples.kotlinSpark.UntypedDatasetKt" +} + +/** + * Runs the spark/typedDataset example with java 11. + */ +val runSparkTypedDataset by tasks.registering(JavaExec::class) { + classpath = sourceSets["main"].runtimeClasspath + javaLauncher = javaToolchains.launcherFor { languageVersion = JavaLanguageVersion.of(11) } + mainClass = "org.jetbrains.kotlinx.dataframe.examples.spark.TypedDatasetKt" +} + +/** + * Runs the spark/untypedDataset example with java 11. + */ +val runSparkUntypedDataset by tasks.registering(JavaExec::class) { + classpath = sourceSets["main"].runtimeClasspath + javaLauncher = javaToolchains.launcherFor { languageVersion = JavaLanguageVersion.of(11) } + mainClass = "org.jetbrains.kotlinx.dataframe.examples.spark.UntypedDatasetKt" +} diff --git a/examples/idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples/exposed/compatibilityLayer.kt b/examples/idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples/exposed/compatibilityLayer.kt new file mode 100644 index 0000000000..04b84fd8c0 --- /dev/null +++ b/examples/idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples/exposed/compatibilityLayer.kt @@ -0,0 +1,107 @@ +package org.jetbrains.kotlinx.dataframe.examples.exposed + +import org.jetbrains.exposed.v1.core.BiCompositeColumn +import org.jetbrains.exposed.v1.core.Column +import org.jetbrains.exposed.v1.core.Expression +import org.jetbrains.exposed.v1.core.ExpressionAlias +import org.jetbrains.exposed.v1.core.ResultRow +import org.jetbrains.exposed.v1.core.Table +import org.jetbrains.exposed.v1.jdbc.Query +import org.jetbrains.kotlinx.dataframe.AnyFrame +import org.jetbrains.kotlinx.dataframe.DataFrame +import org.jetbrains.kotlinx.dataframe.annotations.DataSchema +import org.jetbrains.kotlinx.dataframe.api.convertTo +import org.jetbrains.kotlinx.dataframe.api.toDataFrame +import org.jetbrains.kotlinx.dataframe.codeGen.NameNormalizer +import org.jetbrains.kotlinx.dataframe.impl.schema.DataFrameSchemaImpl +import org.jetbrains.kotlinx.dataframe.schema.ColumnSchema +import org.jetbrains.kotlinx.dataframe.schema.DataFrameSchema +import kotlin.reflect.KProperty1 +import kotlin.reflect.full.isSubtypeOf +import kotlin.reflect.full.memberProperties +import kotlin.reflect.typeOf + +/** + * Retrieves all columns of any [Iterable][Iterable]`<`[ResultRow][ResultRow]`>`, like [Query][Query], + * from Exposed row by row and converts the resulting [Map] into a [DataFrame], cast to type [T]. + * + * In notebooks, the untyped version works just as well due to runtime inference :) + */ +inline fun Iterable.convertToDataFrame(): DataFrame = + convertToDataFrame().convertTo() + +/** + * Retrieves all columns of an [Iterable][Iterable]`<`[ResultRow][ResultRow]`>` from Exposed, like [Query][Query], + * row by row and converts the resulting [Map] of lists into a [DataFrame] by calling + * [Map.toDataFrame]. + */ +@JvmName("convertToAnyFrame") +fun Iterable.convertToDataFrame(): AnyFrame { + val map = mutableMapOf>() + for (row in this) { + for (expression in row.fieldIndex.keys) { + map.getOrPut(expression.readableName) { + mutableListOf() + } += row[expression] + } + } + return map.toDataFrame() +} + +/** + * Retrieves a simple column name from [this] [Expression]. + * + * Might need to be expanded with multiple types of [Expression]. + */ +val Expression<*>.readableName: String + get() = when (this) { + is Column<*> -> name + is ExpressionAlias<*> -> alias + is BiCompositeColumn<*, *, *> -> getRealColumns().joinToString("_") { it.readableName } + else -> toString() + } + +/** + * Creates a [DataFrameSchema] from the declared [Table] instance. + * + * This is not needed for conversion, but it can be useful to create a DataFrame [@DataSchema][DataSchema] instance. + * + * @param columnNameToAccessor Optional [MutableMap] which will be filled with entries mapping + * the SQL column name to the accessor name from the [Table]. + * This can be used to define a [NameNormalizer] later. + * @see toDataFrameSchemaWithNameNormalizer + */ +@Suppress("UNCHECKED_CAST") +fun Table.toDataFrameSchema(columnNameToAccessor: MutableMap = mutableMapOf()): DataFrameSchema { + // we use reflection to go over all `Column<*>` properties in the Table object + val columns = this::class.memberProperties + .filter { it.returnType.isSubtypeOf(typeOf>()) } + .associate { prop -> + prop as KProperty1> + + // retrieve the SQL column name + val columnName = prop.get(this).name + // store the SQL column name together with the accessor name in the map + columnNameToAccessor[columnName] = prop.name + + // get the column type from `val a: Column` + val type = prop.returnType.arguments.first().type!! + + // and we add the name and column shema type to the `columns` map :) + columnName to ColumnSchema.Value(type) + } + return DataFrameSchemaImpl(columns) +} + +/** + * Creates a [DataFrameSchema] from the declared [Table] instance with a [NameNormalizer] to + * convert the SQL column names to the corresponding Kotlin property names. + * + * This is not needed for conversion, but it can be useful to create a DataFrame [@DataSchema][DataSchema] instance. + * + * @see toDataFrameSchema + */ +fun Table.toDataFrameSchemaWithNameNormalizer(): Pair { + val columnNameToAccessor = mutableMapOf() + return Pair(toDataFrameSchema(), NameNormalizer { columnNameToAccessor[it] ?: it }) +} diff --git a/examples/idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples/exposed/main.kt b/examples/idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples/exposed/main.kt new file mode 100644 index 0000000000..cf157eb423 --- /dev/null +++ b/examples/idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples/exposed/main.kt @@ -0,0 +1,76 @@ +package org.jetbrains.kotlinx.dataframe.examples.exposed + +import org.jetbrains.exposed.v1.core.Column +import org.jetbrains.exposed.v1.core.StdOutSqlLogger +import org.jetbrains.exposed.v1.jdbc.Database +import org.jetbrains.exposed.v1.jdbc.SchemaUtils +import org.jetbrains.exposed.v1.jdbc.addLogger +import org.jetbrains.exposed.v1.jdbc.batchInsert +import org.jetbrains.exposed.v1.jdbc.deleteAll +import org.jetbrains.exposed.v1.jdbc.selectAll +import org.jetbrains.exposed.v1.jdbc.transactions.transaction +import org.jetbrains.kotlinx.dataframe.api.asSequence +import org.jetbrains.kotlinx.dataframe.api.count +import org.jetbrains.kotlinx.dataframe.api.describe +import org.jetbrains.kotlinx.dataframe.api.groupBy +import org.jetbrains.kotlinx.dataframe.api.print +import org.jetbrains.kotlinx.dataframe.api.sortByDesc +import org.jetbrains.kotlinx.dataframe.size +import java.io.File + +/** + * Describes a simple bridge between [Exposed](https://www.jetbrains.com/exposed/) and DataFrame! + */ +fun main() { + // defining where to find our SQLite database for Exposed + val resourceDb = "chinook.db" + val dbPath = File(object {}.javaClass.classLoader.getResource(resourceDb)!!.toURI()).absolutePath + val db = Database.connect(url = "jdbc:sqlite:$dbPath", driver = "org.sqlite.JDBC") + + // let's read the database! + val df = transaction(db) { + addLogger(StdOutSqlLogger) + + // tables in Exposed need to be defined, see tables.kt + SchemaUtils.create(Customers, Artists, Albums) + + // Perform the specific query you want to read into the DataFrame. + // Note: DataFrames are in-memory structures, so don't make it too large if you don't have the RAM ;) + val query = Customers.selectAll() // .where { Customers.company.isNotNull() } + + // read and convert the query to a typed DataFrame + // see compatibilityLayer.kt for how we created convertToDataFrame<>() + // and see tables.kt for how we created CustomersDf! + query.convertToDataFrame() + } + + println(df.size()) + + // now we have a DataFrame, we can perform DataFrame operations, + // like seeing how often a country is represented + df.groupBy { country }.count() + .sortByDesc { "count"() } + .print(columnTypes = true, borders = true) + + // or just general statistics + df.describe() + .print(columnTypes = true, borders = true) + + // or make plots using Kandy! It's all up to you + + // writing a DataFrame back into an SQL database with Exposed can also be done easily! + transaction(db) { + addLogger(StdOutSqlLogger) + + // first delete the original contents + Customers.deleteAll() + + // batch-insert our dataframe back into the SQL database as a sequence of rows + Customers.batchInsert(df.asSequence()) { dfRow -> + // we simply go over each value in the row and put it in the right place in the Exposed statement + for (column in Customers.columns) { + this[column as Column] = dfRow[column.name] + } + } + } +} diff --git a/examples/idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples/exposed/tables.kt b/examples/idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples/exposed/tables.kt new file mode 100644 index 0000000000..500c62d039 --- /dev/null +++ b/examples/idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples/exposed/tables.kt @@ -0,0 +1,98 @@ +package org.jetbrains.kotlinx.dataframe.examples.exposed + +import org.jetbrains.exposed.v1.core.Column +import org.jetbrains.exposed.v1.core.Table +import org.jetbrains.kotlinx.dataframe.annotations.ColumnName +import org.jetbrains.kotlinx.dataframe.annotations.DataSchema +import org.jetbrains.kotlinx.dataframe.api.generateDataClasses +import org.jetbrains.kotlinx.dataframe.api.print +import org.jetbrains.kotlinx.dataframe.codeGen.NameNormalizer + +object Albums : Table() { + val albumId: Column = integer("AlbumId").autoIncrement() + val title: Column = varchar("Title", 160) + val artistId: Column = integer("ArtistId") + + override val primaryKey = PrimaryKey(albumId) +} + +object Artists : Table() { + val artistId: Column = integer("ArtistId").autoIncrement() + val name: Column = varchar("Name", 120) + + override val primaryKey = PrimaryKey(artistId) +} + +object Customers : Table() { + val customerId: Column = integer("CustomerId").autoIncrement() + val firstName: Column = varchar("FirstName", 40) + val lastName: Column = varchar("LastName", 20) + val company: Column = varchar("Company", 80).nullable() + val address: Column = varchar("Address", 70).nullable() + val city: Column = varchar("City", 40).nullable() + val state: Column = varchar("State", 40).nullable() + val country: Column = varchar("Country", 40).nullable() + val postalCode: Column = varchar("PostalCode", 10).nullable() + val phone: Column = varchar("Phone", 24).nullable() + val fax: Column = varchar("Fax", 24).nullable() + val email: Column = varchar("Email", 60) + val supportRepId: Column = integer("SupportRepId").nullable() + + override val primaryKey = PrimaryKey(customerId) +} + +/** + * Exposed requires you to provide [Table] instances to + * provide type-safe access to your columns and data. + * + * While DataFrame can infer types at runtime, which is enough for Kotlin Notebook, + * to get type safe access at compile time, we need to define a [@DataSchema][DataSchema]. + * + * This is what we created the [toDataFrameSchema] function for! + */ +fun main() { + val (schema, nameNormalizer) = Customers.toDataFrameSchemaWithNameNormalizer() + + // checking whether the schema is converted correctly. + // schema.print() + + // printing a @DataSchema data class to copy-paste into the code. + // we use a NameNormalizer to let DataFrame generate the same accessors as in the Table + // while keeping the correct column names + schema.generateDataClasses( + name = "CustomersDf", + nameNormalizer = nameNormalizer, + ).print() +} + +// created by Customers.toDataFrameSchema() +// The same can be done for the other tables +@DataSchema +data class CustomersDf( + @ColumnName("Address") + val address: String?, + @ColumnName("City") + val city: String?, + @ColumnName("Company") + val company: String?, + @ColumnName("Country") + val country: String?, + @ColumnName("CustomerId") + val customerId: Int, + @ColumnName("Email") + val email: String, + @ColumnName("Fax") + val fax: String?, + @ColumnName("FirstName") + val firstName: String, + @ColumnName("LastName") + val lastName: String, + @ColumnName("Phone") + val phone: String?, + @ColumnName("PostalCode") + val postalCode: String?, + @ColumnName("State") + val state: String?, + @ColumnName("SupportRepId") + val supportRepId: Int?, +) diff --git a/examples/idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples/kotlinSpark/compatibilityLayer.kt b/examples/idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples/kotlinSpark/compatibilityLayer.kt new file mode 100644 index 0000000000..2b7e2bc824 --- /dev/null +++ b/examples/idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples/kotlinSpark/compatibilityLayer.kt @@ -0,0 +1,8 @@ +@file:Suppress("ktlint:standard:no-empty-file") + +package org.jetbrains.kotlinx.dataframe.examples.kotlinSpark + +/* + * See ../spark/compatibilityLayer.kt for the implementation. + * It's the same with- and without the Kotlin Spark API. + */ diff --git a/examples/idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples/kotlinSpark/typedDataset.kt b/examples/idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples/kotlinSpark/typedDataset.kt new file mode 100644 index 0000000000..59715ceddc --- /dev/null +++ b/examples/idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples/kotlinSpark/typedDataset.kt @@ -0,0 +1,78 @@ +@file:Suppress("ktlint:standard:function-signature") + +package org.jetbrains.kotlinx.dataframe.examples.kotlinSpark + +import org.apache.spark.sql.Dataset +import org.jetbrains.kotlinx.dataframe.annotations.DataSchema +import org.jetbrains.kotlinx.dataframe.api.aggregate +import org.jetbrains.kotlinx.dataframe.api.groupBy +import org.jetbrains.kotlinx.dataframe.api.max +import org.jetbrains.kotlinx.dataframe.api.mean +import org.jetbrains.kotlinx.dataframe.api.min +import org.jetbrains.kotlinx.dataframe.api.print +import org.jetbrains.kotlinx.dataframe.api.schema +import org.jetbrains.kotlinx.dataframe.api.std +import org.jetbrains.kotlinx.dataframe.api.toDataFrame +import org.jetbrains.kotlinx.dataframe.api.toList +import org.jetbrains.kotlinx.spark.api.withSpark + +/** + * With the Kotlin Spark API, normal Kotlin data classes are supported, + * meaning we can reuse the same class for Spark and DataFrame! + * + * Also, since we use an actual class to define the schema, we need no type conversion! + * + * See [Person] and [Name] for an example. + * + * NOTE: You will likely need to run this function with Java 8 or 11 for it to work correctly. + * Use the `runKotlinSparkTypedDataset` Gradle task to do so. + */ +fun main() = withSpark { + // Creating a Spark Dataset. Usually, this is loaded from some server or database. + val rawDataset: Dataset = listOf( + Person(Name("Alice", "Cooper"), 15, "London", 54, true), + Person(Name("Bob", "Dylan"), 45, "Dubai", 87, true), + Person(Name("Charlie", "Daniels"), 20, "Moscow", null, false), + Person(Name("Charlie", "Chaplin"), 40, "Milan", null, true), + Person(Name("Bob", "Marley"), 30, "Tokyo", 68, true), + Person(Name("Alice", "Wolf"), 20, null, 55, false), + Person(Name("Charlie", "Byrd"), 30, "Moscow", 90, true), + ).toDS() + + // we can perform large operations in Spark. + // DataFrames are in-memory structures, so this is a good place to limit the number of rows if you don't have the RAM ;) + val dataset = rawDataset.filter { it.age > 17 } + + // and convert it to DataFrame via a typed List + val dataframe = dataset.collectAsList().toDataFrame() + dataframe.schema().print() + dataframe.print(columnTypes = true, borders = true) + + // now we can use DataFrame-specific functions + val ageStats = dataframe + .groupBy { city }.aggregate { + mean { age } into "meanAge" + std { age } into "stdAge" + min { age } into "minAge" + max { age } into "maxAge" + } + + ageStats.print(columnTypes = true, borders = true) + + // and when we want to convert a DataFrame back to Spark, we can do the same trick via a typed List + val sparkDatasetAgain = dataframe.toList().toDS() + sparkDatasetAgain.printSchema() + sparkDatasetAgain.show() +} + +@DataSchema +data class Name(val firstName: String, val lastName: String) + +@DataSchema +data class Person( + val name: Name, + val age: Int, + val city: String?, + val weight: Int?, + val isHappy: Boolean, +) diff --git a/examples/idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples/kotlinSpark/untypedDataset.kt b/examples/idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples/kotlinSpark/untypedDataset.kt new file mode 100644 index 0000000000..b5aeebecc9 --- /dev/null +++ b/examples/idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples/kotlinSpark/untypedDataset.kt @@ -0,0 +1,74 @@ +@file:Suppress("ktlint:standard:function-signature") + +package org.jetbrains.kotlinx.dataframe.examples.kotlinSpark + +import org.apache.spark.sql.Dataset +import org.apache.spark.sql.Row +import org.jetbrains.kotlinx.dataframe.api.aggregate +import org.jetbrains.kotlinx.dataframe.api.groupBy +import org.jetbrains.kotlinx.dataframe.api.max +import org.jetbrains.kotlinx.dataframe.api.mean +import org.jetbrains.kotlinx.dataframe.api.min +import org.jetbrains.kotlinx.dataframe.api.print +import org.jetbrains.kotlinx.dataframe.api.schema +import org.jetbrains.kotlinx.dataframe.api.std +import org.jetbrains.kotlinx.dataframe.examples.spark.convertToDataFrame +import org.jetbrains.kotlinx.dataframe.examples.spark.convertToDataFrameByInference +import org.jetbrains.kotlinx.dataframe.examples.spark.convertToSpark +import org.jetbrains.kotlinx.spark.api.col +import org.jetbrains.kotlinx.spark.api.gt +import org.jetbrains.kotlinx.spark.api.withSpark + +/** + * Since we don't know the schema at compile time this time, we need to do + * some schema mapping in between Spark and DataFrame. + * + * We will use spark/compatibilityLayer.kt to do this. + * Take a look at that file for the implementation details! + * + * NOTE: You will likely need to run this function with Java 8 or 11 for it to work correctly. + * Use the `runKotlinSparkUntypedDataset` Gradle task to do so. + */ +fun main() = withSpark { + // Creating a Spark Dataframe (untyped Dataset). Usually, this is loaded from some server or database. + val rawDataset: Dataset = listOf( + Person(Name("Alice", "Cooper"), 15, "London", 54, true), + Person(Name("Bob", "Dylan"), 45, "Dubai", 87, true), + Person(Name("Charlie", "Daniels"), 20, "Moscow", null, false), + Person(Name("Charlie", "Chaplin"), 40, "Milan", null, true), + Person(Name("Bob", "Marley"), 30, "Tokyo", 68, true), + Person(Name("Alice", "Wolf"), 20, null, 55, false), + Person(Name("Charlie", "Byrd"), 30, "Moscow", 90, true), + ).toDF() + + // we can perform large operations in Spark. + // DataFrames are in-memory structures, so this is a good place to limit the number of rows if you don't have the RAM ;) + val dataset = rawDataset.filter(col("age") gt 17) + + // Using inference + val df1 = dataset.convertToDataFrameByInference() + df1.schema().print() + df1.print(columnTypes = true, borders = true) + + // Using full schema mapping + val df2 = dataset.convertToDataFrame() + df2.schema().print() + df2.print(columnTypes = true, borders = true) + + // now we can use DataFrame-specific functions + val ageStats = df1 + .groupBy("city").aggregate { + mean("age") into "meanAge" + std("age") into "stdAge" + min("age") into "minAge" + max("age") into "maxAge" + } + + ageStats.print(columnTypes = true, borders = true) + + // and when we want to convert a DataFrame back to Spark, we will use the `convertToSpark()` extension function + // This performs the necessary schema mapping under the hood. + val sparkDataset = df2.convertToSpark(spark, sc) + sparkDataset.printSchema() + sparkDataset.show() +} diff --git a/examples/idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples/multik/compatibilityLayer.kt b/examples/idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples/multik/compatibilityLayer.kt new file mode 100644 index 0000000000..5e71993dac --- /dev/null +++ b/examples/idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples/multik/compatibilityLayer.kt @@ -0,0 +1,374 @@ +@file:OptIn(ExperimentalTypeInference::class) + +package org.jetbrains.kotlinx.dataframe.examples.multik + +import org.jetbrains.kotlinx.dataframe.AnyFrame +import org.jetbrains.kotlinx.dataframe.ColumnSelector +import org.jetbrains.kotlinx.dataframe.ColumnsSelector +import org.jetbrains.kotlinx.dataframe.DataColumn +import org.jetbrains.kotlinx.dataframe.DataFrame +import org.jetbrains.kotlinx.dataframe.api.ValueProperty +import org.jetbrains.kotlinx.dataframe.api.cast +import org.jetbrains.kotlinx.dataframe.api.colsOf +import org.jetbrains.kotlinx.dataframe.api.column +import org.jetbrains.kotlinx.dataframe.api.dataFrameOf +import org.jetbrains.kotlinx.dataframe.api.getColumn +import org.jetbrains.kotlinx.dataframe.api.getColumns +import org.jetbrains.kotlinx.dataframe.api.map +import org.jetbrains.kotlinx.dataframe.api.named +import org.jetbrains.kotlinx.dataframe.api.toColumn +import org.jetbrains.kotlinx.dataframe.api.toColumnGroup +import org.jetbrains.kotlinx.dataframe.api.toDataFrame +import org.jetbrains.kotlinx.dataframe.columns.BaseColumn +import org.jetbrains.kotlinx.dataframe.columns.ColumnGroup +import org.jetbrains.kotlinx.multik.api.mk +import org.jetbrains.kotlinx.multik.api.ndarray +import org.jetbrains.kotlinx.multik.ndarray.complex.Complex +import org.jetbrains.kotlinx.multik.ndarray.data.D1Array +import org.jetbrains.kotlinx.multik.ndarray.data.D2Array +import org.jetbrains.kotlinx.multik.ndarray.data.D3Array +import org.jetbrains.kotlinx.multik.ndarray.data.MultiArray +import org.jetbrains.kotlinx.multik.ndarray.data.NDArray +import org.jetbrains.kotlinx.multik.ndarray.data.get +import org.jetbrains.kotlinx.multik.ndarray.operations.toList +import org.jetbrains.kotlinx.multik.ndarray.operations.toListD2 +import kotlin.experimental.ExperimentalTypeInference +import kotlin.reflect.KClass +import kotlin.reflect.KType +import kotlin.reflect.full.isSubtypeOf +import kotlin.reflect.typeOf + +// region 1D + +/** Converts a one-dimensional array ([D1Array]) to a [DataColumn] with optional [name]. */ +inline fun D1Array.convertToColumn(name: String = ""): DataColumn { + // we can simply convert the 1D array to a typed list and create a typed column from it + // by using the reified type parameter, DataFrame needs to do no inference :) + val values = this.toList() + return column(values) named name +} + +/** + * Converts a one-dimensional array ([D1Array]) of type [N] into a DataFrame. + * The resulting DataFrame contains a single column named "value", where each element of the array becomes a row in the DataFrame. + * + * @return a DataFrame where each element of the source array is represented as a row in a column named "value" under the schema [ValueProperty]. + */ +@JvmName("convert1dArrayToDataFrame") +inline fun D1Array.convertToDataFrame(): DataFrame> { + // do the conversion like above, but name the column "value"... + val column = this.convertToColumn(ValueProperty<*>::value.name) + // ...so we can cast it to a ValueProperty DataFrame + return dataFrameOf(column).cast>() +} + +/** Converts a [DataColumn] to a one-dimensional array ([D1Array]). */ +@JvmName("convertNumberColumnToMultik") +inline fun DataColumn.convertToMultik(): D1Array where N : Number, N : Comparable { + // we can convert our column to a typed list again to convert it to a multik array + val values = this.toList() + return mk.ndarray(values) +} + +/** Converts a [DataColumn] to a one-dimensional array ([D1Array]). */ +@JvmName("convertComplexColumnToMultik") +inline fun DataColumn.convertToMultik(): D1Array { + // we can convert our column to a typed list again to convert it to a multik array + val values = this.toList() + return mk.ndarray(values) +} + +/** Converts a [DataColumn] selected by [column] to a one-dimensional array ([D1Array]). */ +@JvmName("convertNumberColumnFromDfToMultik") +@OverloadResolutionByLambdaReturnType +inline fun DataFrame.convertToMultik( + crossinline column: ColumnSelector, +): D1Array + where N : Number, N : Comparable { + // use the selector to get the column from this DataFrame and convert it + val col = this.getColumn { column(it) } + return col.convertToMultik() +} + +/** Converts a [DataColumn] selected by [column] to a one-dimensional array ([D1Array]). */ +@JvmName("convertComplexColumnFromDfToMultik") +@OverloadResolutionByLambdaReturnType +inline fun DataFrame.convertToMultik(crossinline column: ColumnSelector): D1Array { + // use the selector to get the column from this DataFrame and convert it + val col = this.getColumn { column(it) } + return col.convertToMultik() +} + +// endregion + +// region 2D + +/** + * Converts a two-dimensional array ([D2Array]) to a DataFrame. + * It will contain `shape[0]` rows and `shape[1]` columns. + * + * Column names can be specified using the [columnNameGenerator] lambda. + * + * The conversion enforces that `multikArray[x][y] == dataframe[x][y]` + */ +@JvmName("convert2dArrayToDataFrame") +inline fun D2Array.convertToDataFrame(columnNameGenerator: (Int) -> String = { "col$it" }): AnyFrame { + // Turning the 2D array into a list of typed columns first, no inference needed + val columns: List> = List(shape[1]) { i -> + this[0..(name = columnNameGenerator(i)) + } + // and make a DataFrame from it + return columns.toDataFrame() +} + +/** + * Converts a [DataFrame] to a two-dimensional array ([D2Array]). + * You'll need to specify which columns to convert using the [columns] selector. + * + * All columns need to be of the same type. If no columns are supplied, the function + * will only succeed if all columns are of the same type. + * + * @see convertToMultikOf + */ +@JvmName("convertNumberColumnsFromDfToMultik") +@OverloadResolutionByLambdaReturnType +inline fun DataFrame.convertToMultik( + crossinline columns: ColumnsSelector, +): D2Array + where N : Number, N : Comparable { + // use the selector to get the columns from this DataFrame and convert them + val cols = this.getColumns { columns(it) } + return cols.convertToMultik() +} + +/** + * Converts a [DataFrame] to a two-dimensional array ([D2Array]). + * You'll need to specify which columns to convert using the [columns] selector. + * + * All columns need to be of the same type. If no columns are supplied, the function + * will only succeed if all columns are of the same type. + * + * @see convertToMultikOf + */ +@JvmName("convertComplexColumnsFromDfToMultik") +@OverloadResolutionByLambdaReturnType +inline fun DataFrame.convertToMultik( + crossinline columns: ColumnsSelector, +): D2Array { + // use the selector to get the columns from this DataFrame and convert them + val cols = this.getColumns { columns(it) } + return cols.convertToMultik() +} + +/** + * Converts a [DataFrame] to a two-dimensional array ([D2Array]). + * You'll need to specify which columns to convert using the `columns` selector. + * + * All columns need to be of the same type. If no columns are supplied, the function + * will only succeed if all columns in [this] are of the same type. + * + * @see convertToMultikOf + */ +@JvmName("convertToMultikGuess") +fun AnyFrame.convertToMultik(): D2Array<*> { + val columnTypes = this.columnTypes().distinct() + val type = columnTypes.singleOrNull() ?: error("found multiple column types: $columnTypes") + return when { + type == typeOf() -> convertToMultik { colsOf() } + type.isSubtypeOf(typeOf()) -> convertToMultik { colsOf() } + type.isSubtypeOf(typeOf()) -> convertToMultik { colsOf() } + type.isSubtypeOf(typeOf()) -> convertToMultik { colsOf() } + type.isSubtypeOf(typeOf()) -> convertToMultik { colsOf() } + type.isSubtypeOf(typeOf()) -> convertToMultik { colsOf() } + type.isSubtypeOf(typeOf()) -> convertToMultik { colsOf() } + else -> error("found multiple column types: $columnTypes") + } +} + +/** + * Converts a [DataFrame] to a two-dimensional array ([D2Array]) by taking all + * columns of type [N]. + * + * Allows you to write `df.convertToMultikOf()`. + * + * @see convertToMultik + */ +@JvmName("convertToMultikOfComplex") +@Suppress("LocalVariableName") +inline fun AnyFrame.convertToMultikOf( + // unused param to avoid overload resolution ambiguity + _klass: KClass = Complex::class, +): D2Array = + convertToMultik { colsOf() } + +/** + * Converts a [DataFrame] to a two-dimensional array ([D2Array]) by taking all + * columns of type [N]. + * + * Allows you to write `df.convertToMultikOf()`. + * + * @see convertToMultik + */ +@JvmName("convertToMultikOfNumber") +@Suppress("LocalVariableName") +inline fun AnyFrame.convertToMultikOf( + // unused param to avoid overload resolution ambiguity + _klass: KClass = Number::class, +): D2Array where N : Number, N : Comparable = convertToMultik { colsOf() } + +/** + * Helper function to convert a list of same-typed [DataColumn]s to a two-dimensional array ([D2Array]). + * We cannot enforce all columns have the same type if we require just a [DataFrame]. + */ +@Suppress("UNCHECKED_CAST") +@JvmName("convertNumberColumnsToMultik") +inline fun List>.convertToMultik(): D2Array where N : Number, N : Comparable { + // to get the list of columns as a list of rows, we need to convert them back to a dataframe first, + // then we can get the values of each row + val rows = this.toDataFrame().map { row -> row.values() as List } + return mk.ndarray(rows) +} + +/** + * Helper function to convert a list of same-typed [DataColumn]s to a two-dimensional array ([D2Array]). + * We cannot enforce all columns have the same type if we require just a [DataFrame]. + */ +@Suppress("UNCHECKED_CAST") +@JvmName("convertComplexColumnsToMultik") +inline fun List>.convertToMultik(): D2Array { + // to get the list of columns as a list of rows, we need to convert them back to a dataframe first, + // then we can get the values of each row + val rows = this.toDataFrame().map { row -> row.values() as List } + return mk.ndarray(rows) +} + +// endregion + +// region higher dimensions + +/** + * Converts a three-dimensional array ([D3Array]) to a DataFrame. + * It will contain `shape[0]` rows and `shape[1]` columns containing lists of size `shape[2]`. + * + * Column names can be specified using the [columnNameGenerator] lambda. + * + * The conversion enforces that `multikArray[x][y][z] == dataframe[x][y][z]` + */ +inline fun D3Array.convertToDataFrameWithLists( + columnNameGenerator: (Int) -> String = { "col$it" }, +): AnyFrame { + val columns: List>> = List(shape[1]) { y -> + this[0..>(name = columnNameGenerator(y)) + } + return columns.toDataFrame() +} + +/** + * Converts a three-dimensional array ([D3Array]) to a DataFrame. + * It will contain `shape[0]` rows and `shape[1]` column groups containing `shape[2]` columns each. + * + * Column names can be specified using the [columnNameGenerator] lambda. + * + * The conversion enforces that `multikArray[x][y][z] == dataframe[x][y][z]` + */ +@JvmName("convert3dArrayToDataFrame") +inline fun D3Array.convertToDataFrame(columnNameGenerator: (Int) -> String = { "col$it" }): AnyFrame { + val columns: List> = List(shape[1]) { y -> + this[0.. + list.toColumn(name = columnNameGenerator(z)) + } // we get shape[2] columns inside each column group + .toColumnGroup(name = columnNameGenerator(y)) + } + return columns.toDataFrame() +} + +/** + * Exploratory recursive function to convert a [MultiArray] of any number of dimensions + * to a `List>` of the same number of dimensions. + */ +fun MultiArray.toListDn(): List<*> { + // Recursive helper function to handle traversal across dimensions + fun toListRecursive(indices: IntArray): List<*> { + // If we are at the last dimension (1D case) + if (indices.size == shape.lastIndex) { + return List(shape[indices.size]) { i -> + this[intArrayOf(*indices, i)] // Collect values for this dimension + } + } + + // For higher dimensions, recursively process smaller dimensions + return List(shape[indices.size]) { i -> + toListRecursive(indices + i) // Add `i` to the current index array + } + } + return toListRecursive(intArrayOf()) +} + +/** + * Converts a multidimensional array ([NDArray]) to a DataFrame. + * Inspired by [toListDn]. + * + * For a single-dimensional array, it will call [D1Array.convertToDataFrame]. + * + * Column names can be specified using the [columnNameGenerator] lambda. + * + * The conversion enforces that `multikArray[a][b][c][d]... == dataframe[a][b][c][d]...` + */ +@Suppress("UNCHECKED_CAST") +inline fun NDArray.convertToDataFrameNestedGroups( + noinline columnNameGenerator: (Int) -> String = { "col$it" }, +): AnyFrame { + if (shape.size == 1) return (this as D1Array).convertToDataFrame() + + // push the first dimension to the end, because this represents the rows in DataFrame, + // and they are accessed by []'s first + return transpose(*(1..(), // cannot inline a recursive function, so pass the type explicitly + columnNameGenerator = columnNameGenerator, + ).let { + // we could just cast this to a DataFrame<*>, because a ColumnGroup<*>: DataFrame + // however, this can sometimes cause issues where instance checks are done at runtime + // this converts it to an actual DataFrame instance + dataFrameOf((it as ColumnGroup<*>).columns()) + } +} + +/** + * Recursive helper function to handle traversal across dimensions. Do not call directly, + * use [convertToDataFrameNestedGroups] instead. + */ +@PublishedApi +internal fun NDArray<*, *>.convertToDataFrameNestedGroupsRecursive( + indices: IntArray, + type: KType, + columnNameGenerator: (Int) -> String, +): BaseColumn<*> { + // If we are at the last dimension (1D case) + if (indices.size == shape.lastIndex) { + return List(shape[indices.size]) { i -> + this[intArrayOf(*indices, i)] // Collect values for this dimension + }.let { + DataColumn.createByType(name = "", values = it, type = type) + } + } + + // For higher dimensions, recursively process smaller dimensions + return List(shape[indices.size]) { i -> + convertToDataFrameNestedGroupsRecursive( + indices = indices + i, // Add `i` to the current index array + type = type, + columnNameGenerator = columnNameGenerator, + ).rename(columnNameGenerator(i)) + }.toColumnGroup("") +} + +// endregion diff --git a/examples/idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples/multik/io.kt b/examples/idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples/multik/io.kt new file mode 100644 index 0000000000..64581538a1 --- /dev/null +++ b/examples/idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples/multik/io.kt @@ -0,0 +1,23 @@ +package org.jetbrains.kotlinx.dataframe.examples.multik + +import org.jetbrains.kotlinx.dataframe.api.print +import org.jetbrains.kotlinx.multik.api.io.readNPY +import org.jetbrains.kotlinx.multik.api.mk +import org.jetbrains.kotlinx.multik.ndarray.data.D1 +import java.io.File + +/** + * Multik can read/write data from NPY/NPZ files. + * We can use this from DataFrame too! + * + * We use compatibilityLayer.kt for the conversions, check it out for the implementation details of the conversion! + */ +fun main() { + val npyFilename = "a1d.npy" + val npyFile = File(object {}.javaClass.classLoader.getResource(npyFilename)!!.toURI()) + + val mk1 = mk.readNPY(npyFile) + val df1 = mk1.convertToDataFrame() + + df1.print(borders = true, columnTypes = true) +} diff --git a/examples/idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples/multik/main.kt b/examples/idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples/multik/main.kt new file mode 100644 index 0000000000..c75e763a70 --- /dev/null +++ b/examples/idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples/multik/main.kt @@ -0,0 +1,99 @@ +package org.jetbrains.kotlinx.dataframe.examples.multik + +import org.jetbrains.kotlinx.dataframe.api.cast +import org.jetbrains.kotlinx.dataframe.api.colsOf +import org.jetbrains.kotlinx.dataframe.api.describe +import org.jetbrains.kotlinx.dataframe.api.mean +import org.jetbrains.kotlinx.dataframe.api.meanFor +import org.jetbrains.kotlinx.dataframe.api.print +import org.jetbrains.kotlinx.dataframe.api.value +import org.jetbrains.kotlinx.multik.api.mk +import org.jetbrains.kotlinx.multik.api.rand +import org.jetbrains.kotlinx.multik.ndarray.data.get + +/** + * Let's explore some ways we can combine Multik with Kotlin DataFrame. + * + * We will use compatibilityLayer.kt for the conversions. + * Take a look at that file for the implementation details! + */ +fun main() { + oneDimension() + twoDimensions() + higherDimensions() +} + +fun oneDimension() { + // we can convert a 1D ndarray to a column of a DataFrame: + val mk1 = mk.rand(50) + val col1 by mk1.convertToColumn() + println(col1) + + // or straight to a DataFrame. It will become the `value` column. + val df1 = mk1.convertToDataFrame() + println(df1) + + // this allows us to perform any DF operation: + println(df1.mean { value }) + df1.describe().print(borders = true) + + // we can convert back to Multik: + val mk2 = df1.convertToMultik { value } + // or + df1.value.convertToMultik() + + println(mk2) +} + +fun twoDimensions() { + // we can also convert a 2D ndarray to a DataFrame + // This conversion will create columns like "col0", "col1", etc. + // (careful, when the number of columns is too large, this can cause problems) + // but will allow for similar access like in multik + // aka: `multikArray[x][y] == dataframe[x][y]` + val mk1 = mk.rand(5, 10) + println(mk1) + val df = mk1.convertToDataFrame() + df.print() + + // this allows us to perform any DF operation: + val means = df.meanFor { ("col0".."col9").cast() } + means.print() + + // we can convert back to Multik in multiple ways. + // Multik can only store one type of data, so we need to specify the type or select + // only the columns we want: + val mk2 = df.convertToMultik { colsOf() } + // or + df.convertToMultikOf() + // or if all columns are of the same type: + df.convertToMultik() + + println(mk2) +} + +fun higherDimensions() { + // Multik can store higher dimensions as well + // however; to convert this to a DataFrame, we need to specify how to do a particular conversion + // for instance, for 3d, we could store a list in each cell of the DF to represent the extra dimension: + val mk1 = mk.rand(5, 4, 3) + + println(mk1) + + val df1 = mk1.convertToDataFrameWithLists() + df1.print() + + // Alternatively, this could be solved using column groups. + // This subdivides each column into more columns, while ensuring `multikArray[x][y][z] == dataframe[x][y][z]` + val df2 = mk1.convertToDataFrame() + df2.print() + + // For even higher dimensions, we can keep adding more column groups + val mk2 = mk.rand(5, 4, 3, 2) + val df3 = mk2.convertToDataFrameNestedGroups() + df3.print() + + // ...or use nested DataFrames (in FrameColumns) + // (for instance, a 4D matrix could be stored in a 2D DataFrame where each cell is another DataFrame) + // but, we'll leave that as an exercise for the reader :) +} diff --git a/examples/idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples/multik/multikInsideDataFrame.kt b/examples/idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples/multik/multikInsideDataFrame.kt new file mode 100644 index 0000000000..cfbaa484b0 --- /dev/null +++ b/examples/idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples/multik/multikInsideDataFrame.kt @@ -0,0 +1,115 @@ +package org.jetbrains.kotlinx.dataframe.examples.multik + +import kotlinx.datetime.LocalDate +import org.jetbrains.kotlinx.dataframe.annotations.DataSchema +import org.jetbrains.kotlinx.dataframe.api.append +import org.jetbrains.kotlinx.dataframe.api.cast +import org.jetbrains.kotlinx.dataframe.api.mapToFrame +import org.jetbrains.kotlinx.dataframe.api.print +import org.jetbrains.kotlinx.dataframe.api.single +import org.jetbrains.kotlinx.dataframe.api.toDataFrame +import org.jetbrains.kotlinx.multik.api.mk +import org.jetbrains.kotlinx.multik.api.rand +import org.jetbrains.kotlinx.multik.ndarray.data.D3Array +import org.jetbrains.kotlinx.multik.ndarray.data.D4Array +import java.time.Month.JULY + +/** + * DataFrames can store anything inside, including Multik ndarrays. + * This can be useful for storing matrices for easier access later or to simply organize data read from other files. + * For example, MRI data is often stored as 3D arrays and sometimes even 4D arrays. + */ +fun main() { + // imaginary list of patient data + @Suppress("ktlint:standard:argument-list-wrapping") + val metadata = listOf( + MriMetadata(10012L, 25, "Healthy", LocalDate(2023, 1, 1)), + MriMetadata(10013L, 45, "Tuberculosis", LocalDate(2023, 2, 15)), + MriMetadata(10014L, 32, "Healthy", LocalDate(2023, 3, 22)), + MriMetadata(10015L, 58, "Pneumonia", LocalDate(2023, 4, 8)), + MriMetadata(10016L, 29, "Tuberculosis", LocalDate(2023, 5, 30)), + MriMetadata(10017L, 42, "Healthy", LocalDate(2023, 6, 15)), + MriMetadata(10018L, 37, "Healthy", LocalDate(2023, 7, 1)), + MriMetadata(10019L, 55, "Healthy", LocalDate(2023, 8, 15)), + MriMetadata(10020L, 28, "Healthy", LocalDate(2023, 9, 1)), + MriMetadata(10021L, 44, "Healthy", LocalDate(2023, 10, 15)), + MriMetadata(10022L, 31, "Healthy", LocalDate(2023, 11, 1)), + ).toDataFrame() + + // "reading" the results from "files" + val results = metadata.mapToFrame { + +patientId + +age + +diagnosis + +scanDate + "t1WeightedMri" from { readT1WeightedMri(patientId) } + "fMriBoldSeries" from { readFMRiBoldSeries(patientId) } + }.cast(verify = true) + .append() + + results.print(borders = true) + + // now when we want to check and visualize the T1-weighted MRI scan + // for that one healthy patient in July, we can do: + val scan = results + .single { scanDate.month == JULY && diagnosis == "Healthy" } + .t1WeightedMri + + // easy :) + visualize(scan) +} + +@DataSchema +data class MriMetadata( + /** Unique patient ID. */ + val patientId: Long, + /** Patient age. */ + val age: Int, + /** Clinical diagnosis (e.g. "Healthy", "Tuberculosis") */ + val diagnosis: String, + /** Date of the scan */ + val scanDate: LocalDate, +) + +@DataSchema +data class MriResults( + /** Unique patient ID. */ + val patientId: Long, + /** Patient age. */ + val age: Int, + /** Clinical diagnosis (e.g. "Healthy", "Tuberculosis") */ + val diagnosis: String, + /** Date of the scan */ + val scanDate: LocalDate, + /** + * T1-weighted anatomical MRI scan. + * + * Dimensions: (256 x 256 x 180) + * - 256 width x 256 height + * - 180 slices + */ + val t1WeightedMri: D3Array, + /** + * Blood oxygenation level-dependent (BOLD) time series from an fMRI scan. + * + * Dimensions: (64 x 64 x 30 x 200) + * - 64 width x 64 height + * - 30 slices + * - 200 timepoints + */ + val fMriBoldSeries: D4Array, +) + +fun readT1WeightedMri(id: Long): D3Array { + // This should in practice, of course, read the actual data, but for this example we just return a dummy array + return mk.rand(256, 256, 180) +} + +fun readFMRiBoldSeries(id: Long): D4Array { + // This should in practice, of course, read the actual data, but for this example we just return a dummy array + return mk.rand(64, 64, 30, 200) +} + +fun visualize(scan: D3Array) { + // This would then actually visualize the scan +} diff --git a/examples/idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples/spark/compatibilityLayer.kt b/examples/idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples/spark/compatibilityLayer.kt new file mode 100644 index 0000000000..080a2e1f60 --- /dev/null +++ b/examples/idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples/spark/compatibilityLayer.kt @@ -0,0 +1,330 @@ +package org.jetbrains.kotlinx.dataframe.examples.spark + +import org.apache.spark.api.java.JavaRDD +import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.sql.Dataset +import org.apache.spark.sql.Row +import org.apache.spark.sql.RowFactory +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.types.ArrayType +import org.apache.spark.sql.types.DataType +import org.apache.spark.sql.types.DataTypes +import org.apache.spark.sql.types.Decimal +import org.apache.spark.sql.types.DecimalType +import org.apache.spark.sql.types.MapType +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.CalendarInterval +import org.jetbrains.kotlinx.dataframe.AnyFrame +import org.jetbrains.kotlinx.dataframe.DataColumn +import org.jetbrains.kotlinx.dataframe.DataFrame +import org.jetbrains.kotlinx.dataframe.DataRow +import org.jetbrains.kotlinx.dataframe.api.rows +import org.jetbrains.kotlinx.dataframe.api.schema +import org.jetbrains.kotlinx.dataframe.api.toDataFrame +import org.jetbrains.kotlinx.dataframe.columns.ColumnGroup +import org.jetbrains.kotlinx.dataframe.columns.TypeSuggestion +import org.jetbrains.kotlinx.dataframe.schema.ColumnSchema +import org.jetbrains.kotlinx.dataframe.schema.DataFrameSchema +import java.math.BigDecimal +import java.math.BigInteger +import java.sql.Date +import java.sql.Timestamp +import java.time.Instant +import java.time.LocalDate +import kotlin.reflect.KType +import kotlin.reflect.KTypeProjection +import kotlin.reflect.full.createType +import kotlin.reflect.full.isSubtypeOf +import kotlin.reflect.full.withNullability +import kotlin.reflect.typeOf + +// region Spark to DataFrame + +/** + * Converts an untyped Spark [Dataset] (Dataframe) to a Kotlin [DataFrame]. + * [StructTypes][StructType] are converted to [ColumnGroups][ColumnGroup]. + * + * DataFrame supports type inference to do the conversion automatically. + * This is usually fine for smaller data sets, but when working with larger datasets, a type map might be a good idea. + * See [convertToDataFrame] for more information. + */ +fun Dataset.convertToDataFrameByInference( + schema: StructType = schema(), + prefix: List = emptyList(), +): AnyFrame { + val columns = schema.fields().map { field -> + val name = field.name() + when (val dataType = field.dataType()) { + is StructType -> + // a column group can be easily created from a dataframe and a name + DataColumn.createColumnGroup( + name = name, + df = this.convertToDataFrameByInference(dataType, prefix + name), + ) + + else -> + // we can use DataFrame type inference to create a column with the correct type + // from Spark we use `select()` to select a single column + // and `collectAsList()` to get all the values in a list of single-celled rows + DataColumn.createByInference( + name = name, + values = this.select((prefix + name).joinToString(".")) + .collectAsList() + .map { it[0] }, + suggestedType = TypeSuggestion.Infer, + // Spark provides nullability :) you can leave this out if you want this to be inferred too + nullable = field.nullable(), + ) + } + } + return columns.toDataFrame() +} + +/** + * Converts an untyped Spark [Dataset] (Dataframe) to a Kotlin [DataFrame]. + * [StructTypes][StructType] are converted to [ColumnGroups][ColumnGroup]. + * + * This version uses a [type-map][DataType.convertToDataFrame] to convert the schemas with a fallback to inference. + * For smaller data sets, inference is usually fine too. + * See [convertToDataFrameByInference] for more information. + */ +fun Dataset.convertToDataFrame(schema: StructType = schema(), prefix: List = emptyList()): AnyFrame { + val columns = schema.fields().map { field -> + val name = field.name() + when (val dataType = field.dataType()) { + is StructType -> + // a column group can be easily created from a dataframe and a name + DataColumn.createColumnGroup( + name = name, + df = convertToDataFrame(dataType, prefix + name), + ) + + else -> + // we create a column with the correct type using our type-map with fallback to inference + // from Spark we use `select()` to select a single column + // and `collectAsList()` to get all the values in a list of single-celled rows + DataColumn.createByInference( + name = name, + values = select((prefix + name).joinToString(".")) + .collectAsList() + .map { it[0] }, + suggestedType = + dataType.convertToDataFrame() + ?.let(TypeSuggestion::Use) + ?: TypeSuggestion.Infer, // fallback to inference if needed + nullable = field.nullable(), + ) + } + } + return columns.toDataFrame() +} + +/** + * Returns the corresponding [Kotlin type][KType] for a given Spark [DataType]. + * + * This list may be incomplete, but it can at least give you a good start. + * + * @return The [KType] that corresponds to the Spark [DataType], or null if no matching [KType] is found. + */ +fun DataType.convertToDataFrame(): KType? = + when { + this == DataTypes.ByteType -> typeOf() + + this == DataTypes.ShortType -> typeOf() + + this == DataTypes.IntegerType -> typeOf() + + this == DataTypes.LongType -> typeOf() + + this == DataTypes.BooleanType -> typeOf() + + this == DataTypes.FloatType -> typeOf() + + this == DataTypes.DoubleType -> typeOf() + + this == DataTypes.StringType -> typeOf() + + this == DataTypes.DateType -> typeOf() + + this == DataTypes.TimestampType -> typeOf() + + this is DecimalType -> typeOf() + + this == DataTypes.CalendarIntervalType -> typeOf() + + this == DataTypes.NullType -> nullableNothingType + + this == DataTypes.BinaryType -> typeOf() + + this is ArrayType -> { + when (elementType()) { + DataTypes.ShortType -> typeOf() + DataTypes.IntegerType -> typeOf() + DataTypes.LongType -> typeOf() + DataTypes.FloatType -> typeOf() + DataTypes.DoubleType -> typeOf() + DataTypes.BooleanType -> typeOf() + else -> null + } + } + + this is MapType -> { + val key = keyType().convertToDataFrame() ?: return null + val value = valueType().convertToDataFrame() ?: return null + Map::class.createType( + listOf( + KTypeProjection.invariant(key), + KTypeProjection.invariant(value.withNullability(valueContainsNull())), + ), + ) + } + + else -> null + } + +// endregion + +// region DataFrame to Spark + +/** + * Converts the [DataFrame] to a Spark [Dataset] of [Rows][Row] using the provided [SparkSession] and [JavaSparkContext]. + * + * Spark needs both the data and the schema to be converted to create a correct [Dataset], + * so we need to map our types somehow. + * + * @param spark The [SparkSession] object to use for creating the [DataFrame]. + * @param sc The [JavaSparkContext] object to use for converting the [DataFrame] to [RDD][JavaRDD]. + * @return A [Dataset] of [Rows][Row] representing the converted DataFrame. + */ +fun DataFrame<*>.convertToSpark(spark: SparkSession, sc: JavaSparkContext): Dataset { + // Convert each row to spark rows + val rows = sc.parallelize(this.rows().map { it.convertToSpark() }) + // convert the data schema to a spark StructType + val schema = this.schema().convertToSpark() + return spark.createDataFrame(rows, schema) +} + +/** + * Converts a [DataRow] to a Spark [Row] object. + * + * @return The converted Spark [Row]. + */ +fun DataRow<*>.convertToSpark(): Row = + RowFactory.create( + *values().map { + when (it) { + // a row can be nested inside another row if it's a column group + is DataRow<*> -> it.convertToSpark() + + is DataFrame<*> -> error("nested dataframes are not supported") + + else -> it + } + }.toTypedArray(), + ) + +/** + * Converts a [DataFrameSchema] to a Spark [StructType]. + * + * @return The converted Spark [StructType]. + */ +fun DataFrameSchema.convertToSpark(): StructType = + DataTypes.createStructType( + this.columns.map { (name, schema) -> + DataTypes.createStructField(name, schema.convertToSpark(), schema.nullable) + }, + ) + +/** + * Converts a [ColumnSchema] object to Spark [DataType]. + * + * @return The Spark [DataType] corresponding to the given [ColumnSchema] object. + * @throws IllegalArgumentException if the column type or kind is unknown. + */ +fun ColumnSchema.convertToSpark(): DataType = + when (this) { + is ColumnSchema.Value -> type.convertToSpark() ?: error("unknown data type: $type") + is ColumnSchema.Group -> schema.convertToSpark() + is ColumnSchema.Frame -> error("nested dataframes are not supported") + else -> error("unknown column kind: $this") + } + +/** + * Returns the corresponding Spark [DataType] for a given [Kotlin type][KType]. + * + * This list may be incomplete, but it can at least give you a good start. + * + * @return The Spark [DataType] that corresponds to the [Kotlin type][KType], or null if no matching [DataType] is found. + */ +fun KType.convertToSpark(): DataType? = + when { + isSubtypeOf(typeOf()) -> DataTypes.ByteType + + isSubtypeOf(typeOf()) -> DataTypes.ShortType + + isSubtypeOf(typeOf()) -> DataTypes.IntegerType + + isSubtypeOf(typeOf()) -> DataTypes.LongType + + isSubtypeOf(typeOf()) -> DataTypes.BooleanType + + isSubtypeOf(typeOf()) -> DataTypes.FloatType + + isSubtypeOf(typeOf()) -> DataTypes.DoubleType + + isSubtypeOf(typeOf()) -> DataTypes.StringType + + isSubtypeOf(typeOf()) -> DataTypes.DateType + + isSubtypeOf(typeOf()) -> DataTypes.DateType + + isSubtypeOf(typeOf()) -> DataTypes.TimestampType + + isSubtypeOf(typeOf()) -> DataTypes.TimestampType + + isSubtypeOf(typeOf()) -> DecimalType.SYSTEM_DEFAULT() + + isSubtypeOf(typeOf()) -> DecimalType.SYSTEM_DEFAULT() + + isSubtypeOf(typeOf()) -> DecimalType.SYSTEM_DEFAULT() + + isSubtypeOf(typeOf()) -> DataTypes.CalendarIntervalType + + isSubtypeOf(nullableNothingType) -> DataTypes.NullType + + isSubtypeOf(typeOf()) -> DataTypes.BinaryType + + isSubtypeOf(typeOf()) -> DataTypes.createArrayType(DataTypes.ShortType, false) + + isSubtypeOf(typeOf()) -> DataTypes.createArrayType(DataTypes.IntegerType, false) + + isSubtypeOf(typeOf()) -> DataTypes.createArrayType(DataTypes.LongType, false) + + isSubtypeOf(typeOf()) -> DataTypes.createArrayType(DataTypes.FloatType, false) + + isSubtypeOf(typeOf()) -> DataTypes.createArrayType(DataTypes.DoubleType, false) + + isSubtypeOf(typeOf()) -> DataTypes.createArrayType(DataTypes.BooleanType, false) + + isSubtypeOf(typeOf>()) -> + error("non-primitive arrays are not supported for now, you can add it yourself") + + isSubtypeOf(typeOf>()) -> error("lists are not supported for now, you can add it yourself") + + isSubtypeOf(typeOf>()) -> error("sets are not supported for now, you can add it yourself") + + classifier == Map::class -> { + val (key, value) = arguments + DataTypes.createMapType( + key.type?.convertToSpark(), + value.type?.convertToSpark(), + value.type?.isMarkedNullable ?: true, + ) + } + + else -> null + } + +private val nullableNothingType: KType = typeOf>().arguments.first().type!! + +// endregion diff --git a/examples/idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples/spark/typedDataset.kt b/examples/idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples/spark/typedDataset.kt new file mode 100644 index 0000000000..5f16f0fe08 --- /dev/null +++ b/examples/idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples/spark/typedDataset.kt @@ -0,0 +1,105 @@ +@file:Suppress("ktlint:standard:function-signature") + +package org.jetbrains.kotlinx.dataframe.examples.spark + +import org.apache.spark.SparkConf +import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.sql.Dataset +import org.apache.spark.sql.Encoder +import org.apache.spark.sql.Encoders +import org.apache.spark.sql.SparkSession +import org.jetbrains.kotlinx.dataframe.annotations.DataSchema +import org.jetbrains.kotlinx.dataframe.api.aggregate +import org.jetbrains.kotlinx.dataframe.api.groupBy +import org.jetbrains.kotlinx.dataframe.api.max +import org.jetbrains.kotlinx.dataframe.api.mean +import org.jetbrains.kotlinx.dataframe.api.min +import org.jetbrains.kotlinx.dataframe.api.print +import org.jetbrains.kotlinx.dataframe.api.schema +import org.jetbrains.kotlinx.dataframe.api.std +import org.jetbrains.kotlinx.dataframe.api.toDataFrame +import org.jetbrains.kotlinx.dataframe.api.toList +import java.io.Serializable + +/** + * For Spark, Kotlin data classes are supported if we: + * - Add [@JvmOverloads][JvmOverloads] to the constructor + * - Make all parameter arguments mutable and with defaults + * - Make them [Serializable] + * + * But by adding [@DataSchema][DataSchema] we can reuse the same class for Spark and DataFrame! + * + * See [Person] and [Name] for an example. + * + * Also, since we use an actual class to define the schema, we need no type conversion! + * + * NOTE: You will likely need to run this function with Java 8 or 11 for it to work correctly. + * Use the `runSparkTypedDataset` Gradle task to do so. + */ +fun main() { + val spark = SparkSession.builder() + .master(SparkConf().get("spark.master", "local[*]")) + .appName("Kotlin Spark Sample") + .getOrCreate() + val sc = JavaSparkContext(spark.sparkContext()) + + // Creating a Spark Dataset. Usually, this is loaded from some server or database. + val rawDataset: Dataset = spark.createDataset( + listOf( + Person(Name("Alice", "Cooper"), 15, "London", 54, true), + Person(Name("Bob", "Dylan"), 45, "Dubai", 87, true), + Person(Name("Charlie", "Daniels"), 20, "Moscow", null, false), + Person(Name("Charlie", "Chaplin"), 40, "Milan", null, true), + Person(Name("Bob", "Marley"), 30, "Tokyo", 68, true), + Person(Name("Alice", "Wolf"), 20, null, 55, false), + Person(Name("Charlie", "Byrd"), 30, "Moscow", 90, true), + ), + beanEncoderOf(), + ) + + // we can perform large operations in Spark. + // DataFrames are in-memory structures, so this is a good place to limit the number of rows if you don't have the RAM ;) + val dataset = rawDataset.filter { it.age > 17 } + + // and convert it to DataFrame via a typed List + val dataframe = dataset.collectAsList().toDataFrame() + dataframe.schema().print() + dataframe.print(columnTypes = true, borders = true) + + // now we can use DataFrame-specific functions + val ageStats = dataframe + .groupBy { city }.aggregate { + mean { age } into "meanAge" + std { age } into "stdAge" + min { age } into "minAge" + max { age } into "maxAge" + } + + ageStats.print(columnTypes = true, borders = true) + + // and when we want to convert a DataFrame back to Spark, we can do the same trick via a typed List + val sparkDatasetAgain = spark.createDataset(dataframe.toList(), beanEncoderOf()) + sparkDatasetAgain.printSchema() + sparkDatasetAgain.show() + + spark.stop() +} + +/** Creates a [bean encoder][Encoders.bean] for the given [T] instance. */ +inline fun beanEncoderOf(): Encoder = Encoders.bean(T::class.java) + +@DataSchema +data class Name + @JvmOverloads + constructor(var firstName: String = "", var lastName: String = "") : Serializable + +@DataSchema +data class Person + @JvmOverloads + constructor( + var name: Name = Name(), + var age: Int = -1, + var city: String? = null, + var weight: Int? = null, + var isHappy: Boolean = false, + ) : Serializable diff --git a/examples/idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples/spark/untypedDataset.kt b/examples/idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples/spark/untypedDataset.kt new file mode 100644 index 0000000000..b037d70bc8 --- /dev/null +++ b/examples/idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples/spark/untypedDataset.kt @@ -0,0 +1,87 @@ +@file:Suppress("ktlint:standard:function-signature") + +package org.jetbrains.kotlinx.dataframe.examples.spark + +import org.apache.spark.SparkConf +import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.sql.Dataset +import org.apache.spark.sql.Row +import org.apache.spark.sql.SparkSession +import org.jetbrains.kotlinx.dataframe.api.aggregate +import org.jetbrains.kotlinx.dataframe.api.groupBy +import org.jetbrains.kotlinx.dataframe.api.max +import org.jetbrains.kotlinx.dataframe.api.mean +import org.jetbrains.kotlinx.dataframe.api.min +import org.jetbrains.kotlinx.dataframe.api.print +import org.jetbrains.kotlinx.dataframe.api.schema +import org.jetbrains.kotlinx.dataframe.api.std +import org.jetbrains.kotlinx.dataframe.examples.spark.convertToDataFrame +import org.jetbrains.kotlinx.dataframe.examples.spark.convertToDataFrameByInference +import org.jetbrains.kotlinx.dataframe.examples.spark.convertToSpark +import org.jetbrains.kotlinx.spark.api.col +import org.jetbrains.kotlinx.spark.api.gt + +/** + * Since we don't know the schema at compile time this time, we need to do + * some schema mapping in between Spark and DataFrame. + * + * We will use spark/compatibilityLayer.kt to do this. + * Take a look at that file for the implementation details! + * + * NOTE: You will likely need to run this function with Java 8 or 11 for it to work correctly. + * Use the `runSparkUntypedDataset` Gradle task to do so. + */ +fun main() { + val spark = SparkSession.builder() + .master(SparkConf().get("spark.master", "local[*]")) + .appName("Kotlin Spark Sample") + .getOrCreate() + val sc = JavaSparkContext(spark.sparkContext()) + + // Creating a Spark Dataframe (untyped Dataset). Usually, this is loaded from some server or database. + val rawDataset: Dataset = spark.createDataset( + listOf( + Person(Name("Alice", "Cooper"), 15, "London", 54, true), + Person(Name("Bob", "Dylan"), 45, "Dubai", 87, true), + Person(Name("Charlie", "Daniels"), 20, "Moscow", null, false), + Person(Name("Charlie", "Chaplin"), 40, "Milan", null, true), + Person(Name("Bob", "Marley"), 30, "Tokyo", 68, true), + Person(Name("Alice", "Wolf"), 20, null, 55, false), + Person(Name("Charlie", "Byrd"), 30, "Moscow", 90, true), + ), + beanEncoderOf(), + ).toDF() + + // we can perform large operations in Spark. + // DataFrames are in-memory structures, so this is a good place to limit the number of rows if you don't have the RAM ;) + val dataset = rawDataset.filter(col("age") gt 17) + + // Using inference + val df1 = dataset.convertToDataFrameByInference() + df1.schema().print() + df1.print(columnTypes = true, borders = true) + + // Using full schema mapping + val df2 = dataset.convertToDataFrame() + df2.schema().print() + df2.print(columnTypes = true, borders = true) + + // now we can use DataFrame-specific functions + val ageStats = df1 + .groupBy("city").aggregate { + mean("age") into "meanAge" + std("age") into "stdAge" + min("age") into "minAge" + max("age") into "maxAge" + } + + ageStats.print(columnTypes = true, borders = true) + + // and when we want to convert a DataFrame back to Spark, we will use the `convertToSpark()` extension function + // This performs the necessary schema mapping under the hood. + val sparkDataset = df2.convertToSpark(spark, sc) + sparkDataset.printSchema() + sparkDataset.show() + + spark.stop() +} diff --git a/examples/idea-examples/unsupported-data-sources/src/main/resources/a1d.npy b/examples/idea-examples/unsupported-data-sources/src/main/resources/a1d.npy new file mode 100644 index 0000000000..80c6ff86eb Binary files /dev/null and b/examples/idea-examples/unsupported-data-sources/src/main/resources/a1d.npy differ diff --git a/examples/idea-examples/unsupported-data-sources/src/main/resources/chinook.db b/examples/idea-examples/unsupported-data-sources/src/main/resources/chinook.db new file mode 100644 index 0000000000..327d2a08ba Binary files /dev/null and b/examples/idea-examples/unsupported-data-sources/src/main/resources/chinook.db differ diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 9c5e78ca9f..578d0d75fc 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -37,6 +37,7 @@ kotlinDatetime = "0.6.1" openapi = "2.1.24" kotlinLogging = "7.0.3" sl4j = "2.0.16" +log4j = "2.24.3" junit = "4.13.2" junit-jupiter = "5.11.3" @@ -62,6 +63,13 @@ jai-core = "1.1.3" jts = "1.20.0" kandy = "0.8.1-dev-66" +exposed = "1.0.0-beta-2" + +# check the versions down in the [libraries] section too! +kotlin-spark = "1.2.4" +spark = "3.3.2" + +multik = "0.2.3" [libraries] ksp-gradle = { group = "com.google.devtools.ksp", name = "symbol-processing-gradle-plugin", version.ref = "ksp" } @@ -156,7 +164,21 @@ kotlin-jupyter-test-kit = { group = "org.jetbrains.kotlinx", name = "kotlin-jupy kotlinx-benchmark-runtime = { group = "org.jetbrains.kotlinx", name = "kotlinx-benchmark-runtime", version.ref = "benchmark" } dataframe-symbol-processor = { group = "org.jetbrains.kotlinx.dataframe", name = "symbol-processor-all" } -duckdb-jdbc = { group = "org.duckdb", name = "duckdb_jdbc", version.ref= "duckdb"} +duckdb-jdbc = { group = "org.duckdb", name = "duckdb_jdbc", version.ref = "duckdb" } + +exposed-core = { group = "org.jetbrains.exposed", name = "exposed-core", version.ref = "exposed" } +exposed-jdbc = { group = "org.jetbrains.exposed", name = "exposed-jdbc", version.ref = "exposed" } +exposed-kotlin-datetime = { group = "org.jetbrains.exposed", name = "exposed-kotlin-datetime", version.ref = "exposed" } +exposed-json = { group = "org.jetbrains.exposed", name = "exposed-json", version.ref = "exposed" } +exposed-money = { group = "org.jetbrains.exposed", name = "exposed-money", version.ref = "exposed" } + +kotlin-spark = { group = "org.jetbrains.kotlinx.spark", name = "kotlin-spark-api_3.3.2_2.13", version.ref = "kotlin-spark" } +spark = { group = "org.apache.spark", name = "spark-sql_2.13", version.ref = "spark" } +log4j-core = { group = "org.apache.logging.log4j", name = "log4j-core", version.ref = "log4j" } +log4j-api = { group = "org.apache.logging.log4j", name = "log4j-api", version.ref = "log4j" } + +multik-core = { group = "org.jetbrains.kotlinx", name = "multik-core", version.ref = "multik" } +multik-default = { group = "org.jetbrains.kotlinx", name = "multik-default", version.ref = "multik" } [plugins] jupyter-api = { id = "org.jetbrains.kotlin.jupyter.api", version.ref = "kotlinJupyter" } diff --git a/settings.gradle.kts b/settings.gradle.kts index 554a765ed1..d21330810d 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -29,6 +29,7 @@ include("examples:idea-examples:titanic") include("examples:idea-examples:movies") include("examples:idea-examples:youtube") include("examples:idea-examples:json") +include("examples:idea-examples:unsupported-data-sources") includeBuild("examples/kotlin-dataframe-plugin-example") val jupyterApiTCRepo: String by settings