|
116 | 116 | "In this example, PySpark performs a shuffle join behind the scenes to combine the two DataFrames. The process involves partitioning the data based on the join key (\"id\"), shuffling the partitions across the worker nodes, performing local joins on each worker node, and finally merging the results."
|
117 | 117 | ]
|
118 | 118 | },
|
| 119 | + { |
| 120 | + "cell_type": "markdown", |
| 121 | + "id": "549e0e0a", |
| 122 | + "metadata": {}, |
| 123 | + "source": [ |
| 124 | + "### PySpark DataFrame Transformations: select vs withColumn" |
| 125 | + ] |
| 126 | + }, |
| 127 | + { |
| 128 | + "cell_type": "code", |
| 129 | + "execution_count": null, |
| 130 | + "id": "322859d8", |
| 131 | + "metadata": { |
| 132 | + "tags": [ |
| 133 | + "hide-cell" |
| 134 | + ] |
| 135 | + }, |
| 136 | + "outputs": [], |
| 137 | + "source": [ |
| 138 | + "!pip install 'pyspark[sql]'" |
| 139 | + ] |
| 140 | + }, |
| 141 | + { |
| 142 | + "cell_type": "code", |
| 143 | + "execution_count": 1, |
| 144 | + "id": "edbf287e", |
| 145 | + "metadata": { |
| 146 | + "tags": [ |
| 147 | + "hide-cell" |
| 148 | + ] |
| 149 | + }, |
| 150 | + "outputs": [ |
| 151 | + { |
| 152 | + "name": "stderr", |
| 153 | + "output_type": "stream", |
| 154 | + "text": [ |
| 155 | + "Setting default log level to \"WARN\".\n", |
| 156 | + "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n", |
| 157 | + "24/09/02 06:01:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n" |
| 158 | + ] |
| 159 | + } |
| 160 | + ], |
| 161 | + "source": [ |
| 162 | + "from pyspark.sql import SparkSession\n", |
| 163 | + "\n", |
| 164 | + "spark = SparkSession.builder.getOrCreate()" |
| 165 | + ] |
| 166 | + }, |
| 167 | + { |
| 168 | + "cell_type": "markdown", |
| 169 | + "id": "0b1dbabe", |
| 170 | + "metadata": {}, |
| 171 | + "source": [ |
| 172 | + "PySpark's `select` and `withColumn` both can be used to add or modify existing columns. However, their behavior are different.\n", |
| 173 | + "\n", |
| 174 | + "To demonstrate this, let's start with creating a sample DataFrame:" |
| 175 | + ] |
| 176 | + }, |
| 177 | + { |
| 178 | + "cell_type": "code", |
| 179 | + "execution_count": 14, |
| 180 | + "id": "9561afc6", |
| 181 | + "metadata": {}, |
| 182 | + "outputs": [ |
| 183 | + { |
| 184 | + "name": "stdout", |
| 185 | + "output_type": "stream", |
| 186 | + "text": [ |
| 187 | + "+-----+---+-------------+\n", |
| 188 | + "| name|age| city|\n", |
| 189 | + "+-----+---+-------------+\n", |
| 190 | + "|Alice| 28| New York|\n", |
| 191 | + "| Bob| 35|San Francisco|\n", |
| 192 | + "+-----+---+-------------+\n", |
| 193 | + "\n" |
| 194 | + ] |
| 195 | + } |
| 196 | + ], |
| 197 | + "source": [ |
| 198 | + "from pyspark.sql.functions import col, upper\n", |
| 199 | + "\n", |
| 200 | + "\n", |
| 201 | + "data = [\n", |
| 202 | + " (\"Alice\", 28, \"New York\"),\n", |
| 203 | + " (\"Bob\", 35, \"San Francisco\"),\n", |
| 204 | + "]\n", |
| 205 | + "df = spark.createDataFrame(data, [\"name\", \"age\", \"city\"])\n", |
| 206 | + "df.show()" |
| 207 | + ] |
| 208 | + }, |
| 209 | + { |
| 210 | + "cell_type": "markdown", |
| 211 | + "id": "07eba50a", |
| 212 | + "metadata": {}, |
| 213 | + "source": [ |
| 214 | + "\n", |
| 215 | + "`select` only keeps specified columns." |
| 216 | + ] |
| 217 | + }, |
| 218 | + { |
| 219 | + "cell_type": "code", |
| 220 | + "execution_count": 17, |
| 221 | + "id": "278dac0c", |
| 222 | + "metadata": {}, |
| 223 | + "outputs": [ |
| 224 | + { |
| 225 | + "name": "stdout", |
| 226 | + "output_type": "stream", |
| 227 | + "text": [ |
| 228 | + "+-------------+\n", |
| 229 | + "| upper_city|\n", |
| 230 | + "+-------------+\n", |
| 231 | + "| NEW YORK|\n", |
| 232 | + "|SAN FRANCISCO|\n", |
| 233 | + "+-------------+\n", |
| 234 | + "\n" |
| 235 | + ] |
| 236 | + } |
| 237 | + ], |
| 238 | + "source": [ |
| 239 | + "df_select = df.select(upper(col(\"city\")).alias(\"upper_city\"))\n", |
| 240 | + "df_select.show()" |
| 241 | + ] |
| 242 | + }, |
| 243 | + { |
| 244 | + "cell_type": "markdown", |
| 245 | + "id": "3f1dec31", |
| 246 | + "metadata": {}, |
| 247 | + "source": [ |
| 248 | + "`withColumn` retains all original columns plus the new/modified one." |
| 249 | + ] |
| 250 | + }, |
| 251 | + { |
| 252 | + "cell_type": "code", |
| 253 | + "execution_count": 16, |
| 254 | + "id": "3b0a9103", |
| 255 | + "metadata": {}, |
| 256 | + "outputs": [ |
| 257 | + { |
| 258 | + "name": "stdout", |
| 259 | + "output_type": "stream", |
| 260 | + "text": [ |
| 261 | + "+-----+---+-------------+-------------+\n", |
| 262 | + "| name|age| city| upper_city|\n", |
| 263 | + "+-----+---+-------------+-------------+\n", |
| 264 | + "|Alice| 28| New York| NEW YORK|\n", |
| 265 | + "| Bob| 35|San Francisco|SAN FRANCISCO|\n", |
| 266 | + "+-----+---+-------------+-------------+\n", |
| 267 | + "\n" |
| 268 | + ] |
| 269 | + } |
| 270 | + ], |
| 271 | + "source": [ |
| 272 | + "df_withColumn = df.withColumn('upper_city', upper(col('city')))\n", |
| 273 | + "df_withColumn.show()" |
| 274 | + ] |
| 275 | + }, |
119 | 276 | {
|
120 | 277 | "cell_type": "markdown",
|
121 | 278 | "id": "d8dc8623",
|
|
0 commit comments