diff --git a/spark/Dockerfile b/spark/Dockerfile index d8cb9fb..0a430a6 100644 --- a/spark/Dockerfile +++ b/spark/Dockerfile @@ -36,12 +36,6 @@ RUN pip3 install -r requirements.txt # Add scala kernel via spylon-kernel RUN python3 -m spylon_kernel install -# Download and install IJava jupyter kernel -RUN curl https://github.com/SpencerPark/IJava/releases/download/v1.3.0/ijava-1.3.0.zip -Lo ijava-1.3.0.zip \ - && unzip ijava-1.3.0.zip \ - && python3 install.py --sys-prefix \ - && rm ijava-1.3.0.zip - # Optional env variables ENV SPARK_HOME=${SPARK_HOME:-"/opt/spark"} ENV PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.9.7-src.zip:$PYTHONPATH @@ -77,9 +71,6 @@ RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2 && rm awscliv2.zip \ && rm -rf aws/ -# Add iceberg spark runtime jar to IJava classpath -ENV IJAVA_CLASSPATH=/opt/spark/jars/* - RUN mkdir -p /home/iceberg/data \ && curl https://data.cityofnewyork.us/resource/tg4x-b46p.json > /home/iceberg/data/nyc_film_permits.json \ && curl https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2022-04.parquet -o /home/iceberg/data/yellow_tripdata_2022-04.parquet \ diff --git a/spark/entrypoint.sh b/spark/entrypoint.sh index 2738b59..704f316 100755 --- a/spark/entrypoint.sh +++ b/spark/entrypoint.sh @@ -19,6 +19,7 @@ start-master.sh -p 7077 start-worker.sh spark://spark-iceberg:7077 + start-history-server.sh start-thriftserver.sh --driver-java-options "-Dderby.system.home=/tmp/derby" diff --git a/spark/notebooks/Iceberg - An Introduction to the Iceberg Java API using Scala.ipynb b/spark/notebooks/Iceberg - An Introduction to the Iceberg Java API using Scala.ipynb new file mode 100644 index 0000000..6db9d18 --- /dev/null +++ b/spark/notebooks/Iceberg - An Introduction to the Iceberg Java API using Scala.ipynb @@ -0,0 +1,526 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "16f6bb49", + "metadata": {}, + "source": [ + "![iceberg-logo](https://www.apache.org/logos/res/iceberg/iceberg.png)" + ] + }, + { + "cell_type": "markdown", + "id": "c82657e9", + "metadata": {}, + "source": [ + "# An Introduction to the Iceberg Scala API" + ] + }, + { + "cell_type": "markdown", + "id": "3ee90ad2", + "metadata": {}, + "source": [ + "## [Part 1 - Loading a Catalog and Creating a Table](https://tabular.io/blog/java-api-part-1/)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f5c29777-3fd4-4fb1-81c4-799db166ebf7", + "metadata": {}, + "outputs": [], + "source": [ + "spark" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "72e68c62", + "metadata": {}, + "outputs": [], + "source": [ + "import org.apache.iceberg.catalog.Catalog\n", + "import org.apache.hadoop.conf.Configuration\n", + "import org.apache.iceberg.CatalogProperties\n", + "import org.apache.iceberg.rest.RESTCatalog\n", + "import org.apache.iceberg.aws.s3.S3FileIOProperties\n", + "\n", + "import scala.collection.JavaConverters._\n", + "\n", + "val properties: Map[String, String] = Map(\n", + " CatalogProperties.CATALOG_IMPL -> \"org.apache.iceberg.rest.RESTCatalog\",\n", + " CatalogProperties.URI -> \"http://rest:8181\",\n", + " CatalogProperties.WAREHOUSE_LOCATION -> \"s3a://warehouse/wh\",\n", + " CatalogProperties.FILE_IO_IMPL -> \"org.apache.iceberg.aws.s3.S3FileIO\",\n", + " S3FileIOProperties.ENDPOINT -> \"http://minio:9000\"\n", + ")\n", + "\n", + "val catalog = new RESTCatalog()\n", + "val conf = new Configuration()\n", + "catalog.setConf(conf)\n", + "catalog.initialize(\"demo\", properties.asJava)\n", + "catalog.name()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4be615e7", + "metadata": {}, + "outputs": [], + "source": [ + "import org.apache.iceberg.Schema\n", + "import org.apache.iceberg.types.Types\n", + "\n", + "val schema = new Schema(\n", + " Types.NestedField.required(1, \"level\", Types.StringType.get()),\n", + " Types.NestedField.required(2, \"event_time\", Types.TimestampType.withZone()),\n", + " Types.NestedField.required(3, \"message\", Types.StringType.get()),\n", + " Types.NestedField.optional(4, \"call_stack\", Types.ListType.ofRequired(5, Types.StringType.get()))\n", + ")\n", + "\n", + "schema" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b7299d16", + "metadata": {}, + "outputs": [], + "source": [ + "import org.apache.iceberg.PartitionSpec\n", + "\n", + "val spec = PartitionSpec.builderFor(schema).hour(\"event_time\").identity(\"level\").build()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4d900c97", + "metadata": {}, + "outputs": [], + "source": [ + "import org.apache.iceberg.catalog.TableIdentifier\n", + "import org.apache.iceberg.catalog.Namespace\n", + "\n", + "val nyc = Namespace.of(\"nyc\")\n", + "val name = TableIdentifier.of(nyc, \"logs\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c0ed4251-1ccf-4ab2-bc1f-077df739b892", + "metadata": {}, + "outputs": [], + "source": [ + "val sql = s\"DROP TABLE IF EXISTS $name\"\n", + "spark.sql(sql)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8a4d8a6e", + "metadata": {}, + "outputs": [], + "source": [ + "catalog.createTable(name, schema, spec)" + ] + }, + { + "cell_type": "markdown", + "id": "fe62e0a9", + "metadata": {}, + "source": [ + "## [Part 2 - Table Scans](https://tabular.io/blog/java-api-part-2/)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e4bcdf80-720d-4ea0-8296-eb93ffe8bfc0", + "metadata": {}, + "outputs": [], + "source": [ + "val sql = s\"DROP TABLE IF EXISTS $name\"\n", + "spark.sql(sql)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c1e7aa7a", + "metadata": {}, + "outputs": [], + "source": [ + "catalog.createTable(name, schema, spec)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0b17f820", + "metadata": {}, + "outputs": [], + "source": [ + "val query =\n", + " \"\"\"INSERT INTO demo.nyc.logs\n", + " |VALUES\n", + " |('info', timestamp 'today', 'Just letting you know!', array('stack trace line 1', 'stack trace line 2', 'stack trace line 3')),\n", + " |('warning', timestamp 'today', 'You probably should not do this!', array('stack trace line 1', 'stack trace line 2', 'stack trace line 3')),\n", + " |('error', timestamp 'today', 'This was a fatal application error!', array('stack trace line 1', 'stack trace line 2', 'stack trace line 3'))\n", + " |\"\"\".stripMargin\n", + "\n", + "spark.sql(query).show()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "15ca1822", + "metadata": {}, + "outputs": [], + "source": [ + "import org.apache.iceberg.catalog.Catalog\n", + "import org.apache.hadoop.conf.Configuration\n", + "import org.apache.iceberg.CatalogProperties\n", + "import org.apache.iceberg.rest.RESTCatalog\n", + "\n", + "val properties: Map[String, String] = Map(\n", + " CatalogProperties.CATALOG_IMPL -> \"org.apache.iceberg.rest.RESTCatalog\",\n", + " CatalogProperties.URI -> \"http://rest:8181\",\n", + " CatalogProperties.WAREHOUSE_LOCATION -> \"s3a://warehouse/wh/\",\n", + " CatalogProperties.FILE_IO_IMPL -> \"org.apache.iceberg.aws.s3.S3FileIO\",\n", + " S3FileIOProperties.ENDPOINT -> \"http://minio:9000\"\n", + ");\n", + "\n", + "val catalog = new RESTCatalog()\n", + "val conf = new Configuration()\n", + "catalog.setConf(conf)\n", + "catalog.initialize(\"demo\", properties.asJava)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3a5cf423", + "metadata": {}, + "outputs": [], + "source": [ + "import org.apache.iceberg.Table\n", + "import org.apache.iceberg.TableScan\n", + "import org.apache.iceberg.catalog.Namespace\n", + "import org.apache.iceberg.catalog.TableIdentifier\n", + "\n", + "val nyc = Namespace.of(\"nyc\")\n", + "val name = TableIdentifier.of(nyc, \"logs\")\n", + "val table = catalog.loadTable(name)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e472d6a1", + "metadata": {}, + "outputs": [], + "source": [ + "import org.apache.iceberg.io.CloseableIterable\n", + "import org.apache.iceberg.data.Record\n", + "import org.apache.iceberg.data.IcebergGenerics\n", + "\n", + "val result = IcebergGenerics.read(table).build()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0d32f41c", + "metadata": {}, + "outputs": [], + "source": [ + "result.asScala.foreach(println)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7dffc238", + "metadata": {}, + "outputs": [], + "source": [ + "import org.apache.iceberg.expressions.Expressions\n", + "\n", + "val result = IcebergGenerics.read(table).where(Expressions.equal(\"level\", \"error\")).build()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ec2b0431", + "metadata": {}, + "outputs": [], + "source": [ + "import org.apache.iceberg.CombinedScanTask\n", + "import org.apache.iceberg.TableScan\n", + "\n", + "val scan = table.newScan()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "09d13c6b", + "metadata": {}, + "outputs": [], + "source": [ + "import org.apache.iceberg.expressions.Expressions\n", + "\n", + "val filteredScan = scan.filter(Expressions.equal(\"level\", \"info\")).select(\"message\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1857c10f", + "metadata": {}, + "outputs": [], + "source": [ + "val result = filteredScan.planTasks()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ea206ec7", + "metadata": {}, + "outputs": [], + "source": [ + "import org.apache.iceberg.DataFile;\n", + "\n", + "val task = result.iterator().next();\n", + "val dataFile = task.files().iterator().next().file();\n", + "System.out.println(dataFile);" + ] + }, + { + "cell_type": "markdown", + "id": "41e9e10f", + "metadata": {}, + "source": [ + "## [Part 3 - Table Scans](https://tabular.io/blog/java-api-part-3/)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "29574130-7a69-4766-89e5-e3ced476518b", + "metadata": {}, + "outputs": [], + "source": [ + "val webapp = Namespace.of(\"webapp\")\n", + "val name = TableIdentifier.of(webapp, \"user_events\")\n", + "\n", + "spark.sql(s\"DROP TABLE IF EXISTS $name\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "81033412", + "metadata": {}, + "outputs": [], + "source": [ + "import org.apache.iceberg.Schema\n", + "import org.apache.iceberg.types.Types\n", + "import org.apache.iceberg.catalog.{Namespace, TableIdentifier}\n", + "import org.apache.iceberg.PartitionSpec\n", + "\n", + "val schema = new Schema(\n", + " Types.NestedField.optional(1, \"event_id\", Types.StringType.get()),\n", + " Types.NestedField.optional(2, \"username\", Types.StringType.get()),\n", + " Types.NestedField.optional(3, \"userid\", Types.IntegerType.get()),\n", + " Types.NestedField.optional(4, \"api_version\", Types.StringType.get()),\n", + " Types.NestedField.optional(5, \"command\", Types.StringType.get())\n", + ")\n", + "\n", + "catalog.createTable(name, schema, PartitionSpec.unpartitioned())" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "12c45c6b", + "metadata": {}, + "outputs": [], + "source": [ + "import java.util.UUID\n", + "import com.google.common.collect.{ImmutableList, ImmutableMap}\n", + "import scala.jdk.CollectionConverters._\n", + "import org.apache.iceberg.data.GenericRecord\n", + "\n", + "val record = GenericRecord.create(schema)\n", + "val builder = ImmutableList.builder[GenericRecord]()\n", + "\n", + "val records = List(\n", + " Map(\n", + " \"event_id\" -> UUID.randomUUID().toString,\n", + " \"username\" -> \"Bruce\",\n", + " \"userid\" -> 1.asInstanceOf[AnyRef],\n", + " \"api_version\" -> \"1.0\",\n", + " \"command\" -> \"grapple\"\n", + " ),\n", + " Map(\n", + " \"event_id\" -> UUID.randomUUID().toString,\n", + " \"username\" -> \"Wayne\",\n", + " \"userid\" -> 1.asInstanceOf[AnyRef],\n", + " \"api_version\" -> \"1.0\",\n", + " \"command\" -> \"glide\"\n", + " ),\n", + " Map(\n", + " \"event_id\" -> UUID.randomUUID().toString,\n", + " \"username\" -> \"Clark\",\n", + " \"userid\" -> 1.asInstanceOf[AnyRef],\n", + " \"api_version\" -> \"2.0\",\n", + " \"command\" -> \"fly\"\n", + " ),\n", + " Map(\n", + " \"event_id\" -> UUID.randomUUID().toString,\n", + " \"username\" -> \"Kent\",\n", + " \"userid\" -> 1.asInstanceOf[AnyRef],\n", + " \"api_version\" -> \"1.0\",\n", + " \"command\" -> \"land\"\n", + " )\n", + ").map(data => record.copy(data.mapValues(_.asInstanceOf[AnyRef]).toMap.asJava)).foreach(builder.add)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "83bc5319", + "metadata": {}, + "outputs": [], + "source": [ + "import java.util.UUID\n", + "import org.apache.iceberg.io.{DataWriter, OutputFile}\n", + "import org.apache.iceberg.parquet.Parquet\n", + "import org.apache.iceberg.data.GenericRecord\n", + "import org.apache.iceberg.data.parquet.GenericParquetWriter\n", + "import org.apache.iceberg.PartitionSpec\n", + "\n", + "val filepath = s\"${table.location()}/${UUID.randomUUID().toString}\"\n", + "val file: OutputFile = table.io().newOutputFile(filepath)\n", + "\n", + "val dataWriter: DataWriter[GenericRecord] =\n", + " Parquet.writeData(file)\n", + " .schema(schema)\n", + " .createWriterFunc(GenericParquetWriter.buildWriter)\n", + " .overwrite()\n", + " .withSpec(PartitionSpec.unpartitioned())\n", + " .build()\n", + "\n", + "try {\n", + " for (record <- builder.build().asScala) {\n", + " dataWriter.write(record)\n", + " }\n", + "} finally {\n", + " dataWriter.close()\n", + "}\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "469e6af4", + "metadata": {}, + "outputs": [], + "source": [ + "import org.apache.iceberg.DataFile\n", + "\n", + "val dataFile = dataWriter.toDataFile()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "142b6ed1", + "metadata": {}, + "outputs": [], + "source": [ + "import org.apache.iceberg.catalog.Namespace\n", + "import org.apache.iceberg.catalog.TableIdentifier\n", + "import org.apache.iceberg.Table;\n", + "\n", + "val webapp = Namespace.of(\"webapp\");\n", + "val name = TableIdentifier.of(webapp, \"user_events\");\n", + "val tbl = catalog.loadTable(name);\n", + "tbl.newAppend().appendFile(dataFile).commit()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c61e9e79", + "metadata": {}, + "outputs": [], + "source": [ + "import org.apache.iceberg.io.CloseableIterable\n", + "import org.apache.iceberg.data.Record\n", + "import org.apache.iceberg.data.IcebergGenerics\n", + "\n", + "val result = IcebergGenerics.read(tbl).build();\n", + "result.asScala.foreach(println)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "801aec53-47e3-4441-bd15-6dd7a4cabb21", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8bc6dc94-396f-42ae-ac60-bb27cf86ea13", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c80f1225-f73a-44ec-b53a-58d652a34a45", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "spylon-kernel", + "language": "scala", + "name": "spylon-kernel" + }, + "language_info": { + "codemirror_mode": "text/x-scala", + "file_extension": ".scala", + "help_links": [ + { + "text": "MetaKernel Magics", + "url": "https://metakernel.readthedocs.io/en/latest/source/README.html" + } + ], + "mimetype": "text/x-scala", + "name": "scala", + "pygments_lexer": "scala", + "version": "0.4.1" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/spark/notebooks/Iceberg - An Introduction to the Iceberg Java API.ipynb b/spark/notebooks/Iceberg - An Introduction to the Iceberg Java API.ipynb deleted file mode 100644 index 20af42b..0000000 --- a/spark/notebooks/Iceberg - An Introduction to the Iceberg Java API.ipynb +++ /dev/null @@ -1,469 +0,0 @@ -{ - "cells": [ - { - "cell_type": "markdown", - "id": "16f6bb49", - "metadata": {}, - "source": [ - "![iceberg-logo](https://www.apache.org/logos/res/iceberg/iceberg.png)" - ] - }, - { - "cell_type": "markdown", - "id": "c82657e9", - "metadata": {}, - "source": [ - "# An Introduction to the Iceberg Java API" - ] - }, - { - "cell_type": "markdown", - "id": "3ee90ad2", - "metadata": {}, - "source": [ - "## [Part 1 - Loading a Catalog and Creating a Table](https://tabular.io/blog/java-api-part-1/)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "72e68c62", - "metadata": {}, - "outputs": [], - "source": [ - "import org.apache.iceberg.catalog.Catalog;\n", - "import org.apache.hadoop.conf.Configuration;\n", - "import org.apache.iceberg.CatalogProperties;\n", - "import org.apache.iceberg.rest.RESTCatalog;\n", - "import org.apache.iceberg.aws.s3.S3FileIOProperties;\n", - "\n", - "Map properties = new HashMap<>();\n", - "\n", - "properties.put(CatalogProperties.CATALOG_IMPL, \"org.apache.iceberg.rest.RESTCatalog\");\n", - "properties.put(CatalogProperties.URI, \"http://rest:8181\");\n", - "properties.put(CatalogProperties.WAREHOUSE_LOCATION, \"s3a://warehouse/wh\");\n", - "properties.put(CatalogProperties.FILE_IO_IMPL, \"org.apache.iceberg.aws.s3.S3FileIO\");\n", - "properties.put(S3FileIOProperties.ENDPOINT, \"http://minio:9000\");\n", - "\n", - "RESTCatalog catalog = new RESTCatalog();\n", - "Configuration conf = new Configuration();\n", - "catalog.setConf(conf);\n", - "catalog.initialize(\"demo\", properties);\n", - "catalog.name();" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "4be615e7", - "metadata": {}, - "outputs": [], - "source": [ - "import org.apache.iceberg.Schema;\n", - "import org.apache.iceberg.types.Types;\n", - "\n", - "Schema schema = new Schema(\n", - " Types.NestedField.required(1, \"level\", Types.StringType.get()),\n", - " Types.NestedField.required(2, \"event_time\", Types.TimestampType.withZone()),\n", - " Types.NestedField.required(3, \"message\", Types.StringType.get()),\n", - " Types.NestedField.optional(4, \"call_stack\", Types.ListType.ofRequired(5, Types.StringType.get()))\n", - " );\n", - "schema" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "b7299d16", - "metadata": {}, - "outputs": [], - "source": [ - "import org.apache.iceberg.PartitionSpec;\n", - "\n", - "PartitionSpec spec = PartitionSpec.builderFor(schema)\n", - " .hour(\"event_time\")\n", - " .identity(\"level\")\n", - " .build();\n", - "spec" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "4d900c97", - "metadata": {}, - "outputs": [], - "source": [ - "import org.apache.iceberg.catalog.TableIdentifier;\n", - "import org.apache.iceberg.catalog.Namespace;\n", - "\n", - "Namespace nyc = Namespace.of(\"nyc\");\n", - "TableIdentifier name = TableIdentifier.of(nyc, \"logs\");\n", - "name" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "8a4d8a6e", - "metadata": {}, - "outputs": [], - "source": [ - "catalog.createTable(name, schema, spec)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "7d8c46df", - "metadata": {}, - "outputs": [], - "source": [ - "catalog.dropTable(name)" - ] - }, - { - "cell_type": "markdown", - "id": "fe62e0a9", - "metadata": {}, - "source": [ - "## [Part 2 - Table Scans](https://tabular.io/blog/java-api-part-2/)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "c1e7aa7a", - "metadata": {}, - "outputs": [], - "source": [ - "catalog.createTable(name, schema, spec)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "78c95e06", - "metadata": {}, - "outputs": [], - "source": [ - "import org.apache.spark.sql.SparkSession;\n", - "\n", - "SparkSession spark = SparkSession\n", - " .builder()\n", - " .master(\"local[*]\")\n", - " .appName(\"Java API Demo\")\n", - " .config(\"spark.sql.extensions\", \"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions\")\n", - " .config(\"spark.sql.catalog.demo\", \"org.apache.iceberg.spark.SparkCatalog\")\n", - " .config(\"spark.sql.catalog.demo.catalog-impl\", \"org.apache.iceberg.rest.RESTCatalog\")\n", - " .config(\"spark.sql.catalog.demo.uri\", \"http://rest:8181\")\n", - " .config(\"spark.sql.catalog.demo.io-impl\", \"org.apache.iceberg.aws.s3.S3FileIO\")\n", - " .config(\"spark.sql.catalog.demo.s3.endpoint\", \"http://minio:9000\")\n", - " .config(\"spark.sql.defaultCatalog\", \"demo\")\n", - " .config(\"spark.eventLog.enabled\", \"true\")\n", - " .config(\"spark.eventLog.dir\", \"/home/iceberg/spark-events\")\n", - " .config(\"spark.history.fs.logDirectory\", \"/home/iceberg/spark-events\")\n", - " .getOrCreate();\n", - "\n", - "spark.sparkContext().setLogLevel(\"ERROR\");" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "0b17f820", - "metadata": {}, - "outputs": [], - "source": [ - "String query = \"INSERT INTO demo.nyc.logs \"\n", - " + \"VALUES \"\n", - " + \"('info', timestamp 'today', 'Just letting you know!', array('stack trace line 1', 'stack trace line 2', 'stack trace line 3')), \"\n", - " + \"('warning', timestamp 'today', 'You probably should not do this!', array('stack trace line 1', 'stack trace line 2', 'stack trace line 3')), \"\n", - " + \"('error', timestamp 'today', 'This was a fatal application error!', array('stack trace line 1', 'stack trace line 2', 'stack trace line 3'))\";\n", - "\n", - "spark.sql(query).show()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "15ca1822", - "metadata": {}, - "outputs": [], - "source": [ - "import org.apache.iceberg.catalog.Catalog;\n", - "import org.apache.hadoop.conf.Configuration;\n", - "import org.apache.iceberg.CatalogProperties;\n", - "import org.apache.iceberg.rest.RESTCatalog;\n", - "\n", - "Map properties = new HashMap<>();\n", - "\n", - "properties.put(CatalogProperties.CATALOG_IMPL, \"org.apache.iceberg.rest.RESTCatalog\");\n", - "properties.put(CatalogProperties.URI, \"http://rest:8181\");\n", - "properties.put(CatalogProperties.WAREHOUSE_LOCATION, \"s3a://warehouse/wh/\");\n", - "properties.put(CatalogProperties.FILE_IO_IMPL, \"org.apache.iceberg.aws.s3.S3FileIO\");\n", - "properties.put(S3FileIOProperties.ENDPOINT, \"http://minio:9000\");\n", - "\n", - "RESTCatalog catalog = new RESTCatalog();\n", - "Configuration conf = new Configuration();\n", - "catalog.setConf(conf);\n", - "catalog.initialize(\"demo\", properties);" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "3a5cf423", - "metadata": {}, - "outputs": [], - "source": [ - "import org.apache.iceberg.Table;\n", - "import org.apache.iceberg.TableScan;\n", - "import org.apache.iceberg.catalog.Namespace;\n", - "import org.apache.iceberg.catalog.TableIdentifier;\n", - "\n", - "Namespace nyc = Namespace.of(\"nyc\");\n", - "TableIdentifier name = TableIdentifier.of(nyc, \"logs\");\n", - "Table table = catalog.loadTable(name);" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "e472d6a1", - "metadata": {}, - "outputs": [], - "source": [ - "import org.apache.iceberg.io.CloseableIterable;\n", - "import org.apache.iceberg.data.Record;\n", - "import org.apache.iceberg.data.IcebergGenerics;\n", - "\n", - "CloseableIterable result = IcebergGenerics.read(table).build();" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "0d32f41c", - "metadata": {}, - "outputs": [], - "source": [ - "for (Record r: result) {\n", - " System.out.println(r);\n", - "}" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "7dffc238", - "metadata": {}, - "outputs": [], - "source": [ - "import org.apache.iceberg.expressions.Expressions;\n", - "\n", - "CloseableIterable result = IcebergGenerics.read(table)\n", - " .where(Expressions.equal(\"level\", \"error\"))\n", - " .build();" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "ec2b0431", - "metadata": {}, - "outputs": [], - "source": [ - "import org.apache.iceberg.CombinedScanTask;\n", - "import org.apache.iceberg.TableScan;\n", - "\n", - "TableScan scan = table.newScan();" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "09d13c6b", - "metadata": {}, - "outputs": [], - "source": [ - "import org.apache.iceberg.expressions.Expressions;\n", - "\n", - "TableScan filteredScan = scan.filter(Expressions.equal(\"level\", \"info\")).select(\"message\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "1857c10f", - "metadata": {}, - "outputs": [], - "source": [ - "Iterable result = filteredScan.planTasks();" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "ea206ec7", - "metadata": {}, - "outputs": [], - "source": [ - "import org.apache.iceberg.DataFile;\n", - "\n", - "CombinedScanTask task = result.iterator().next();\n", - "DataFile dataFile = task.files().iterator().next().file();\n", - "System.out.println(dataFile);" - ] - }, - { - "cell_type": "markdown", - "id": "41e9e10f", - "metadata": {}, - "source": [ - "## [Part 3 - Table Scans](https://tabular.io/blog/java-api-part-3/)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "81033412", - "metadata": {}, - "outputs": [], - "source": [ - "import org.apache.iceberg.Schema;\n", - "import org.apache.iceberg.types.Types;\n", - "import org.apache.iceberg.catalog.Namespace;\n", - "import org.apache.iceberg.catalog.TableIdentifier;\n", - "import org.apache.iceberg.PartitionSpec;\n", - "\n", - "Schema schema = new Schema(\n", - " Types.NestedField.optional(1, \"event_id\", Types.StringType.get()),\n", - " Types.NestedField.optional(2, \"username\", Types.StringType.get()),\n", - " Types.NestedField.optional(3, \"userid\", Types.IntegerType.get()),\n", - " Types.NestedField.optional(4, \"api_version\", Types.StringType.get()),\n", - " Types.NestedField.optional(5, \"command\", Types.StringType.get())\n", - " );\n", - "\n", - "Namespace webapp = Namespace.of(\"webapp\");\n", - "TableIdentifier name = TableIdentifier.of(webapp, \"user_events\");\n", - "catalog.createTable(name, schema, PartitionSpec.unpartitioned());" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "12c45c6b", - "metadata": {}, - "outputs": [], - "source": [ - "import java.util.UUID;\n", - "import com.google.common.collect.ImmutableList;\n", - "import com.google.common.collect.ImmutableMap;\n", - "import org.apache.iceberg.data.GenericRecord;\n", - "\n", - "GenericRecord record = GenericRecord.create(schema);\n", - "ImmutableList.Builder builder = ImmutableList.builder();\n", - "builder.add(record.copy(ImmutableMap.of(\"event_id\", UUID.randomUUID().toString(), \"username\", \"Bruce\", \"userid\", 1, \"api_version\", \"1.0\", \"command\", \"grapple\")));\n", - "builder.add(record.copy(ImmutableMap.of(\"event_id\", UUID.randomUUID().toString(), \"username\", \"Wayne\", \"userid\", 1, \"api_version\", \"1.0\", \"command\", \"glide\")));\n", - "builder.add(record.copy(ImmutableMap.of(\"event_id\", UUID.randomUUID().toString(), \"username\", \"Clark\", \"userid\", 1, \"api_version\", \"2.0\", \"command\", \"fly\")));\n", - "builder.add(record.copy(ImmutableMap.of(\"event_id\", UUID.randomUUID().toString(), \"username\", \"Kent\", \"userid\", 1, \"api_version\", \"1.0\", \"command\", \"land\")));\n", - "ImmutableList records = builder.build();" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "83bc5319", - "metadata": {}, - "outputs": [], - "source": [ - "import org.apache.iceberg.Files;\n", - "import org.apache.iceberg.io.DataWriter;\n", - "import org.apache.iceberg.io.OutputFile;\n", - "import org.apache.iceberg.parquet.Parquet;\n", - "import org.apache.iceberg.data.parquet.GenericParquetWriter;\n", - "\n", - "String filepath = table.location() + \"/\" + UUID.randomUUID().toString();\n", - "OutputFile file = table.io().newOutputFile(filepath);\n", - "DataWriter dataWriter =\n", - " Parquet.writeData(file)\n", - " .schema(schema)\n", - " .createWriterFunc(GenericParquetWriter::buildWriter)\n", - " .overwrite()\n", - " .withSpec(PartitionSpec.unpartitioned())\n", - " .build();\n", - "try {\n", - " for (GenericRecord record : builder.build()) {\n", - " dataWriter.write(record);\n", - " }\n", - "} finally {\n", - " dataWriter.close();\n", - "}" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "469e6af4", - "metadata": {}, - "outputs": [], - "source": [ - "import org.apache.iceberg.DataFile;\n", - "\n", - "DataFile dataFile = dataWriter.toDataFile();" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "142b6ed1", - "metadata": {}, - "outputs": [], - "source": [ - "import org.apache.iceberg.catalog.Namespace;\n", - "import org.apache.iceberg.catalog.TableIdentifier;\n", - "import org.apache.iceberg.Table;\n", - "\n", - "Namespace webapp = Namespace.of(\"webapp\");\n", - "TableIdentifier name = TableIdentifier.of(webapp, \"user_events\");\n", - "Table tbl = catalog.loadTable(name);\n", - "tbl.newAppend().appendFile(dataFile).commit()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "c61e9e79", - "metadata": {}, - "outputs": [], - "source": [ - "import org.apache.iceberg.io.CloseableIterable;\n", - "import org.apache.iceberg.data.Record;\n", - "import org.apache.iceberg.data.IcebergGenerics;\n", - "\n", - "CloseableIterable result = IcebergGenerics.read(tbl).build();\n", - "for (Record r: result) {\n", - " System.out.println(r);\n", - "}" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Java", - "language": "java", - "name": "java" - }, - "language_info": { - "codemirror_mode": "java", - "file_extension": ".jshell", - "mimetype": "text/x-java-source", - "name": "Java", - "pygments_lexer": "java", - "version": "11.0.15+10-post-Debian-1deb11u1" - } - }, - "nbformat": 4, - "nbformat_minor": 5 -} diff --git a/spark/notebooks/PyIceberg - Commits.ipynb b/spark/notebooks/PyIceberg - Commits.ipynb new file mode 100644 index 0000000..53624e3 --- /dev/null +++ b/spark/notebooks/PyIceberg - Commits.ipynb @@ -0,0 +1,286 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "1041ae6f", + "metadata": {}, + "source": [ + "![iceberg-logo](https://www.apache.org/logos/res/iceberg/iceberg.png)" + ] + }, + { + "cell_type": "markdown", + "id": "247fb2ab", + "metadata": {}, + "source": [ + "### [Docker, Spark, and Iceberg: The Fastest Way to Try Iceberg!](https://tabular.io/blog/docker-spark-and-iceberg/)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6a5c8206", + "metadata": {}, + "outputs": [], + "source": [ + "from pyiceberg import __version__\n", + "\n", + "__version__" + ] + }, + { + "cell_type": "markdown", + "id": "6f9a9f41", + "metadata": {}, + "source": [ + "# Write support\n", + "\n", + "This notebook demonstrates writing to Iceberg tables using PyIceberg. First, connect to the [catalog](https://iceberg.apache.org/concepts/catalog/#iceberg-catalogs), the place where tables are being tracked." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "47645b52", + "metadata": {}, + "outputs": [], + "source": [ + "from pyiceberg.catalog import load_catalog\n", + "\n", + "catalog = load_catalog('default')" + ] + }, + { + "cell_type": "markdown", + "id": "bf1d58ad-5cc1-4e8c-9d7b-a54e67def783", + "metadata": {}, + "source": [ + "# Create an Iceberg table\n", + "\n", + "Next create the Iceberg table directly from the `pyarrow.Table`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "47e5a21d-de87-4aaf-aa06-dc5048acba58", + "metadata": {}, + "outputs": [], + "source": [ + "table_name = \"default.commits\"\n", + "\n", + "try:\n", + " # In case the table already exists\n", + " catalog.drop_table(table_name)\n", + "except:\n", + " pass\n", + "\n", + "from pyiceberg.schema import Schema, NestedField, StringType, LongType\n", + "\n", + "schema = Schema(\n", + " NestedField(1, \"id\", LongType(), True),\n", + " NestedField(2, \"name\", StringType(), True),\n", + " NestedField(3, \"state\", StringType(), True),\n", + " NestedField(4, \"additions\", LongType(), True),\n", + " NestedField(5, \"deletes\", LongType(), True),\n", + " identifier_field_ids=[1]\n", + ")\n", + "\n", + "table = catalog.create_table(table_name, schema=schema)\n", + "\n", + "table" + ] + }, + { + "cell_type": "markdown", + "id": "c531bd4b-9943-4516-9a6a-99fab016ed2b", + "metadata": {}, + "source": [ + "# Loading data using Arrow\n", + "\n", + "Create an example PyArrow table that mimics data from the GitHub API." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9fddb808", + "metadata": {}, + "outputs": [], + "source": [ + "import pyarrow as pa\n", + "\n", + "from pyiceberg.io.pyarrow import schema_to_pyarrow\n", + "\n", + "pa_schema = schema_to_pyarrow(schema)\n", + "\n", + "df = pa.Table.from_pylist(\n", + " [\n", + " {\"id\": 123, \"name\": \"Fix bug\", \"state\": \"Open\", \"additions\": 22, \"deletes\": 10},\n", + " {\"id\": 234, \"name\": \"Add VariantType\", \"state\": \"Open\", \"additions\": 29123, \"deletes\": 302},\n", + " {\"id\": 345, \"name\": \"Add commit retries\", \"state\": \"Open\", \"additions\": 22, \"deletes\": 10},\n", + " ],\n", + " schema=pa_schema\n", + ")\n", + "\n", + "df" + ] + }, + { + "cell_type": "markdown", + "id": "d612c035-4cf6-47a0-844b-165dfb463bbc", + "metadata": {}, + "source": [ + "# Write the data\n", + "\n", + "Let's append the data to the table:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "efee8252", + "metadata": { + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "table.append(df)\n", + "\n", + "assert len(table.scan().to_arrow()) == len(df)\n", + "\n", + "table.scan().to_pandas()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9ce1cecc-8cb0-4622-b0eb-55880d091556", + "metadata": {}, + "outputs": [], + "source": [ + "table.inspect.snapshots().to_pandas()" + ] + }, + { + "cell_type": "markdown", + "id": "c029ea44-8ba6-4c08-a60d-5fffac6c3666", + "metadata": {}, + "source": [ + "# Add moar data" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "794de3a0", + "metadata": {}, + "outputs": [], + "source": [ + "table.append(pa.Table.from_pylist(\n", + " [\n", + " {\"id\": 456, \"name\": \"Add NanosecondTimestamps\", \"state\": \"Merged\", \"additions\": 2392, \"deletes\": 8},\n", + " {\"id\": 567, \"name\": \"Add documentation around filters\", \"state\": \"Open\", \"additions\": 7543, \"deletes\": 3},\n", + " ],\n", + " schema=pa_schema\n", + "))\n", + "\n", + "table.scan().to_pandas()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f3ac7021", + "metadata": {}, + "outputs": [], + "source": [ + "table.inspect.snapshots().to_pandas()" + ] + }, + { + "cell_type": "markdown", + "id": "d1b56466-1878-4391-b726-fca9d3b80705", + "metadata": {}, + "source": [ + "# Upsert new data" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5157a36e-1ab2-4a5b-bf73-1f2d462b0002", + "metadata": {}, + "outputs": [], + "source": [ + "table.upsert(pa.Table.from_pylist(\n", + " [\n", + " # Nothing changes: No-op\n", + " {\"id\": 456, \"name\": \"Add NanosecondTimestamps\", \"state\": \"Merged\", \"additions\": 2392, \"deletes\": 8},\n", + "\n", + " # Updated, nc\n", + " {\"id\": 567, \"name\": \"Add documentation around filters\", \"state\": \"Merged\", \"additions\": 9238, \"deletes\": 22},\n", + " ],\n", + " schema=pa_schema\n", + "))\n", + "\n", + "table.scan().to_pandas()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cf7e1edd-7335-40c1-a50b-90c50536ae6f", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a69bba68-33c9-4c43-998d-e8e91a807341", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "90bed74a-05cd-471d-acf6-a76cb1e4d57c", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "bde071db-f43c-4a7d-929e-a9d904effdc5", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.16" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/spark/notebooks/PyIceberg - Getting Started.ipynb b/spark/notebooks/PyIceberg - Getting Started.ipynb index e58f19c..a55bd2b 100644 --- a/spark/notebooks/PyIceberg - Getting Started.ipynb +++ b/spark/notebooks/PyIceberg - Getting Started.ipynb @@ -260,7 +260,8 @@ "%config SqlMagic.autopandas = True\n", "%config SqlMagic.feedback = False\n", "%config SqlMagic.displaycon = False\n", - "%sql duckdb:///:memory:" + "%sql duckdb:///:memory:\n", + "%sql set python_scan_all_frames=true" ] }, { diff --git a/spark/requirements.txt b/spark/requirements.txt index 48d3160..2669b9f 100644 --- a/spark/requirements.txt +++ b/spark/requirements.txt @@ -1,7 +1,10 @@ jupyter==1.1.1 spylon-kernel==0.4.1 pyiceberg[pyarrow,duckdb,pandas]==0.9.0 -jupysql==0.10.5 +jupysql==0.11.1 matplotlib==3.9.2 scipy==1.14.1 duckdb-engine==0.13.1 +grpcio==1.71.0 +pycolor==1.2.1 +papermill==2.6.0