Skip to content

Commit 2b21847

Browse files
update spark
1 parent cb15cd1 commit 2b21847

File tree

4 files changed

+244
-148
lines changed

4 files changed

+244
-148
lines changed

Chapter5/spark.ipynb

Lines changed: 91 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1099,7 +1099,7 @@
10991099
"id": "a2d96783",
11001100
"metadata": {},
11011101
"source": [
1102-
"### PySpark SQL: Enhancing Reusability with Parameterized Queries"
1102+
"### Writing Safer and Cleaner Spark SQL with PySpark's Parameterized Queries"
11031103
]
11041104
},
11051105
{
@@ -1117,120 +1117,156 @@
11171117
]
11181118
},
11191119
{
1120-
"cell_type": "markdown",
1121-
"id": "0ddc2bc2",
1120+
"cell_type": "code",
1121+
"execution_count": 12,
1122+
"id": "8056b8af",
11221123
"metadata": {},
1124+
"outputs": [],
11231125
"source": [
1124-
"In PySpark, parametrized queries enable the same query structure to be reused with different inputs, without rewriting the SQL.\n",
1126+
"from pyspark.sql import SparkSession\n",
1127+
"import pandas as pd\n",
1128+
"from datetime import date, timedelta\n",
11251129
"\n",
1126-
"Additionally, they safeguard against SQL injection attacks by treating input data as parameters rather than as executable code."
1130+
"spark = SparkSession.builder.getOrCreate()"
11271131
]
11281132
},
11291133
{
1130-
"cell_type": "code",
1131-
"execution_count": null,
1132-
"id": "8056b8af",
1134+
"cell_type": "markdown",
1135+
"id": "0ddc2bc2",
11331136
"metadata": {},
1134-
"outputs": [],
11351137
"source": [
1136-
"from pyspark.sql import SparkSession\n",
1137-
"import pandas as pd \n",
1138+
"When working with Spark SQL queries, using regular Python string interpolation can lead to security vulnerabilities and require extra steps like creating temporary views. PySpark offers a better solution with parameterized queries, which:\n",
11381139
"\n",
1139-
"spark = SparkSession.builder.getOrCreate()"
1140+
"- Protect against SQL injection\n",
1141+
"- Allow using DataFrame objects directly in queries\n",
1142+
"- Automatically handle date formatting\n",
1143+
"- Provide a more expressive way to write SQL queries\n",
1144+
"\n",
1145+
"Let's compare the traditional approach with parameterized queries:"
11401146
]
11411147
},
11421148
{
11431149
"cell_type": "code",
1144-
"execution_count": 3,
1150+
"execution_count": 13,
11451151
"id": "cc5f3c19",
11461152
"metadata": {},
11471153
"outputs": [
1148-
{
1149-
"name": "stderr",
1150-
"output_type": "stream",
1151-
"text": [
1152-
" \r"
1153-
]
1154-
},
11551154
{
11561155
"name": "stdout",
11571156
"output_type": "stream",
11581157
"text": [
1159-
"+-------+-----+\n",
1160-
"|item_id|price|\n",
1161-
"+-------+-----+\n",
1162-
"| 1| 4|\n",
1163-
"| 2| 2|\n",
1164-
"| 3| 5|\n",
1165-
"| 4| 1|\n",
1166-
"+-------+-----+\n",
1158+
"+-------+-----+----------------+\n",
1159+
"|item_id|price|transaction_date|\n",
1160+
"+-------+-----+----------------+\n",
1161+
"| 1| 4| 2023-01-15|\n",
1162+
"| 2| 2| 2023-02-01|\n",
1163+
"| 3| 5| 2023-03-10|\n",
1164+
"| 4| 1| 2023-04-22|\n",
1165+
"+-------+-----+----------------+\n",
11671166
"\n"
11681167
]
11691168
}
11701169
],
11711170
"source": [
11721171
"# Create a Spark DataFrame\n",
1173-
"item_price_pandas = pd.DataFrame({\"item_id\": [1, 2, 3, 4], \"price\": [4, 2, 5, 1]})\n",
1172+
"item_price_pandas = pd.DataFrame({\n",
1173+
" \"item_id\": [1, 2, 3, 4],\n",
1174+
" \"price\": [4, 2, 5, 1],\n",
1175+
" \"transaction_date\": [\n",
1176+
" date(2023, 1, 15),\n",
1177+
" date(2023, 2, 1),\n",
1178+
" date(2023, 3, 10),\n",
1179+
" date(2023, 4, 22)\n",
1180+
" ]\n",
1181+
"})\n",
1182+
"\n",
11741183
"item_price = spark.createDataFrame(item_price_pandas)\n",
11751184
"item_price.show()"
11761185
]
11771186
},
1187+
{
1188+
"cell_type": "markdown",
1189+
"id": "fcfcc76a-5b3e-41b3-819f-14adf8576061",
1190+
"metadata": {},
1191+
"source": [
1192+
"Traditional approach (less secure, requires temp view and wrapping the date in quotes):"
1193+
]
1194+
},
11781195
{
11791196
"cell_type": "code",
1180-
"execution_count": 16,
1181-
"id": "90976e5b",
1197+
"execution_count": 19,
1198+
"id": "451c6d69-8f0d-4b5f-a030-873ed6c5295e",
11821199
"metadata": {},
11831200
"outputs": [
1184-
{
1185-
"name": "stderr",
1186-
"output_type": "stream",
1187-
"text": [
1188-
" \r"
1189-
]
1190-
},
11911201
{
11921202
"name": "stdout",
11931203
"output_type": "stream",
11941204
"text": [
1195-
"+-------+-----+\n",
1196-
"|item_id|price|\n",
1197-
"+-------+-----+\n",
1198-
"| 1| 4|\n",
1199-
"+-------+-----+\n",
1205+
"+-------+-----+----------------+\n",
1206+
"|item_id|price|transaction_date|\n",
1207+
"+-------+-----+----------------+\n",
1208+
"| 3| 5| 2023-03-10|\n",
1209+
"| 4| 1| 2023-04-22|\n",
1210+
"+-------+-----+----------------+\n",
12001211
"\n"
12011212
]
12021213
}
12031214
],
12041215
"source": [
1205-
"query = \"\"\"SELECT item_id, price \n",
1206-
"FROM {item_price} \n",
1207-
"WHERE item_id = {id_val} \n",
1216+
"item_price.createOrReplaceTempView(\"item_price_view\")\n",
1217+
"transaction_date = \"2023-02-15\"\n",
1218+
"\n",
1219+
"query = f\"\"\"SELECT *\n",
1220+
"FROM item_price_view \n",
1221+
"WHERE transaction_date > '{transaction_date}'\n",
12081222
"\"\"\"\n",
12091223
"\n",
1210-
"spark.sql(query, id_val=1, item_price=item_price).show()"
1224+
"spark.sql(query).show()"
1225+
]
1226+
},
1227+
{
1228+
"cell_type": "markdown",
1229+
"id": "d92eecf2-e753-4d4c-8122-713aa160fd98",
1230+
"metadata": {},
1231+
"source": [
1232+
"PySpark's parameterized query approach (secure, no temp view and quotes needed):"
12111233
]
12121234
},
12131235
{
12141236
"cell_type": "code",
1215-
"execution_count": 17,
1216-
"id": "44634ce8",
1237+
"execution_count": 20,
1238+
"id": "90976e5b",
12171239
"metadata": {},
12181240
"outputs": [
12191241
{
12201242
"name": "stdout",
12211243
"output_type": "stream",
12221244
"text": [
1223-
"+-------+-----+\n",
1224-
"|item_id|price|\n",
1225-
"+-------+-----+\n",
1226-
"| 2| 2|\n",
1227-
"+-------+-----+\n",
1245+
"+-------+-----+----------------+\n",
1246+
"|item_id|price|transaction_date|\n",
1247+
"+-------+-----+----------------+\n",
1248+
"| 3| 5| 2023-03-10|\n",
1249+
"| 4| 1| 2023-04-22|\n",
1250+
"+-------+-----+----------------+\n",
12281251
"\n"
12291252
]
12301253
}
12311254
],
12321255
"source": [
1233-
"spark.sql(query, id_val=2, item_price=item_price).show()"
1256+
"query = \"\"\"SELECT *\n",
1257+
"FROM {item_price} \n",
1258+
"WHERE transaction_date > {transaction_date}\n",
1259+
"\"\"\"\n",
1260+
"\n",
1261+
"spark.sql(query, item_price=item_price, transaction_date=transaction_date).show()"
1262+
]
1263+
},
1264+
{
1265+
"cell_type": "markdown",
1266+
"id": "86a79ac8-70d0-458d-a4ac-4d32e897d5d2",
1267+
"metadata": {},
1268+
"source": [
1269+
"This method allows for easy parameter substitution and direct use of DataFrames, making your Spark SQL queries both safer and more convenient to write and maintain."
12341270
]
12351271
},
12361272
{

0 commit comments

Comments
 (0)