Skip to content

Commit 017feeb

Browse files
add udf reusability
1 parent b1e03b7 commit 017feeb

File tree

4 files changed

+560
-1
lines changed

4 files changed

+560
-1
lines changed

Chapter5/spark.ipynb

Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1154,6 +1154,205 @@
11541154
"source": [
11551155
"[Learn more about PySPark UDFs](https://bit.ly/3TEYPHh)."
11561156
]
1157+
},
1158+
{
1159+
"cell_type": "markdown",
1160+
"id": "e85b6d13",
1161+
"metadata": {},
1162+
"source": [
1163+
"### Leverage Spark UDFs for Reusable Complex Logic in SQL Queries"
1164+
]
1165+
},
1166+
{
1167+
"cell_type": "code",
1168+
"execution_count": null,
1169+
"id": "2ef1fce5",
1170+
"metadata": {
1171+
"tags": [
1172+
"hide-cell"
1173+
]
1174+
},
1175+
"outputs": [],
1176+
"source": [
1177+
"!pip install \"pyspark[sql]\""
1178+
]
1179+
},
1180+
{
1181+
"cell_type": "code",
1182+
"execution_count": 6,
1183+
"id": "0230a6da",
1184+
"metadata": {
1185+
"tags": [
1186+
"hide-cell"
1187+
]
1188+
},
1189+
"outputs": [],
1190+
"source": [
1191+
"from pyspark.sql import SparkSession\n",
1192+
"from pyspark.sql.functions import udf, col\n",
1193+
"from pyspark.sql.types import StringType\n",
1194+
"\n",
1195+
"# Create SparkSession\n",
1196+
"spark = SparkSession.builder.getOrCreate()"
1197+
]
1198+
},
1199+
{
1200+
"cell_type": "markdown",
1201+
"id": "c71ec91f",
1202+
"metadata": {},
1203+
"source": [
1204+
"Duplicated code in SQL queries can lead to inconsistencies if changes are made to one instance of the duplicated code but not to others."
1205+
]
1206+
},
1207+
{
1208+
"cell_type": "code",
1209+
"execution_count": 7,
1210+
"id": "0f892003",
1211+
"metadata": {},
1212+
"outputs": [
1213+
{
1214+
"name": "stdout",
1215+
"output_type": "stream",
1216+
"text": [
1217+
"+---------+-----+--------+--------+\n",
1218+
"| name|price|quantity|category|\n",
1219+
"+---------+-----+--------+--------+\n",
1220+
"|Product 1| 10.0| 5| Medium|\n",
1221+
"|Product 2| 15.0| 3| High|\n",
1222+
"|Product 3| 8.0| 2| Low|\n",
1223+
"+---------+-----+--------+--------+\n",
1224+
"\n",
1225+
"+---------+--------+\n",
1226+
"| name|category|\n",
1227+
"+---------+--------+\n",
1228+
"|Product 1| Medium|\n",
1229+
"+---------+--------+\n",
1230+
"\n"
1231+
]
1232+
}
1233+
],
1234+
"source": [
1235+
"# Sample DataFrame\n",
1236+
"df = spark.createDataFrame(\n",
1237+
" [(\"Product 1\", 10.0, 5), (\"Product 2\", 15.0, 3), (\"Product 3\", 8.0, 2)],\n",
1238+
" [\"name\", \"price\", \"quantity\"],\n",
1239+
")\n",
1240+
"\n",
1241+
"# Use df within Spark SQL queries\n",
1242+
"df.createOrReplaceTempView(\"products\")\n",
1243+
"\n",
1244+
"# Select Statement 1\n",
1245+
"result1 = spark.sql(\n",
1246+
" \"\"\"\n",
1247+
" SELECT name, price, quantity,\n",
1248+
" CASE\n",
1249+
" WHEN price < 10.0 THEN 'Low'\n",
1250+
" WHEN price >= 10.0 AND price < 15.0 THEN 'Medium'\n",
1251+
" ELSE 'High'\n",
1252+
" END AS category\n",
1253+
" FROM products\n",
1254+
"\"\"\"\n",
1255+
")\n",
1256+
"\n",
1257+
"# Select Statement 2\n",
1258+
"result2 = spark.sql(\n",
1259+
" \"\"\"\n",
1260+
" SELECT name,\n",
1261+
" CASE\n",
1262+
" WHEN price < 10.0 THEN 'Low'\n",
1263+
" WHEN price >= 10.0 AND price < 15.0 THEN 'Medium'\n",
1264+
" ELSE 'High'\n",
1265+
" END AS category\n",
1266+
" FROM products\n",
1267+
" WHERE quantity > 3\n",
1268+
"\"\"\"\n",
1269+
")\n",
1270+
"\n",
1271+
"# Display the results\n",
1272+
"result1.show()\n",
1273+
"result2.show()"
1274+
]
1275+
},
1276+
{
1277+
"cell_type": "markdown",
1278+
"id": "f3ba0e79",
1279+
"metadata": {},
1280+
"source": [
1281+
"Spark UDFs (User-Defined Functions) can help address these issues by encapsulating complex logic that is reused across multiple SQL queries. \n",
1282+
"\n",
1283+
"In the code example above, we define a UDF `assign_category_label` that assigns category labels based on price. This UDF is then reused in two different SQL statements."
1284+
]
1285+
},
1286+
{
1287+
"cell_type": "code",
1288+
"execution_count": 10,
1289+
"id": "37f4d9c4",
1290+
"metadata": {},
1291+
"outputs": [
1292+
{
1293+
"name": "stderr",
1294+
"output_type": "stream",
1295+
"text": [
1296+
"24/04/15 09:28:11 WARN SimpleFunctionRegistry: The function assign_category_label replaced a previously registered function.\n"
1297+
]
1298+
},
1299+
{
1300+
"name": "stdout",
1301+
"output_type": "stream",
1302+
"text": [
1303+
"+---------+-----+--------+--------+\n",
1304+
"| name|price|quantity|category|\n",
1305+
"+---------+-----+--------+--------+\n",
1306+
"|Product 1| 10.0| 5| Medium|\n",
1307+
"|Product 2| 15.0| 3| High|\n",
1308+
"|Product 3| 8.0| 2| Low|\n",
1309+
"+---------+-----+--------+--------+\n",
1310+
"\n",
1311+
"+---------+--------+\n",
1312+
"| name|category|\n",
1313+
"+---------+--------+\n",
1314+
"|Product 1| Medium|\n",
1315+
"+---------+--------+\n",
1316+
"\n"
1317+
]
1318+
}
1319+
],
1320+
"source": [
1321+
"# Define UDF to assign category label based on price\n",
1322+
"@udf(returnType=StringType())\n",
1323+
"def assign_category_label(price):\n",
1324+
" if price < 10.0:\n",
1325+
" return \"Low\"\n",
1326+
" elif price >= 10.0 and price < 15.0:\n",
1327+
" return \"Medium\"\n",
1328+
" else:\n",
1329+
" return \"High\"\n",
1330+
"\n",
1331+
"\n",
1332+
"# Register UDF\n",
1333+
"spark.udf.register(\"assign_category_label\", assign_category_label)\n",
1334+
"\n",
1335+
"# Select Statement 1\n",
1336+
"result1 = spark.sql(\n",
1337+
" \"\"\"\n",
1338+
" SELECT name, price, quantity, assign_category_label(price) AS category\n",
1339+
" FROM products\n",
1340+
"\"\"\"\n",
1341+
")\n",
1342+
"\n",
1343+
"# Select Statement 2\n",
1344+
"result2 = spark.sql(\n",
1345+
" \"\"\"\n",
1346+
" SELECT name, assign_category_label(price) AS category\n",
1347+
" FROM products\n",
1348+
" WHERE quantity > 3\n",
1349+
"\"\"\"\n",
1350+
")\n",
1351+
"\n",
1352+
"# Display the results\n",
1353+
"result1.show()\n",
1354+
"result2.show()"
1355+
]
11571356
}
11581357
],
11591358
"metadata": {

docs/Chapter5/spark.html

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -517,6 +517,7 @@ <h2> Contents </h2>
517517
<li class="toc-h2 nav-item toc-entry"><a class="reference internal nav-link" href="#pyspark-sql-enhancing-reusability-with-parameterized-queries">6.15.3. PySpark SQL: Enhancing Reusability with Parameterized Queries</a></li>
518518
<li class="toc-h2 nav-item toc-entry"><a class="reference internal nav-link" href="#working-with-arrays-made-easier-in-spark-3-5">6.15.4. Working with Arrays Made Easier in Spark 3.5</a></li>
519519
<li class="toc-h2 nav-item toc-entry"><a class="reference internal nav-link" href="#simplify-complex-sql-queries-with-pyspark-udfs">6.15.5. Simplify Complex SQL Queries with PySpark UDFs</a></li>
520+
<li class="toc-h2 nav-item toc-entry"><a class="reference internal nav-link" href="#leverage-spark-udfs-for-reusable-complex-logic-in-sql-queries">6.15.6. Leverage Spark UDFs for Reusable Complex Logic in SQL Queries</a></li>
520521
</ul>
521522
</nav>
522523
</div>
@@ -1292,6 +1293,165 @@ <h2><span class="section-number">6.15.5. </span>Simplify Complex SQL Queries wit
12921293
</div>
12931294
<p><a class="reference external" href="https://bit.ly/3TEYPHh">Learn more about PySPark UDFs</a>.</p>
12941295
</section>
1296+
<section id="leverage-spark-udfs-for-reusable-complex-logic-in-sql-queries">
1297+
<h2><span class="section-number">6.15.6. </span>Leverage Spark UDFs for Reusable Complex Logic in SQL Queries<a class="headerlink" href="#leverage-spark-udfs-for-reusable-complex-logic-in-sql-queries" title="Permalink to this heading">#</a></h2>
1298+
<div class="cell tag_hide-cell docutils container">
1299+
<details class="hide above-input">
1300+
<summary aria-label="Toggle hidden content">
1301+
<span class="collapsed">Show code cell content</span>
1302+
<span class="expanded">Hide code cell content</span>
1303+
</summary>
1304+
<div class="cell_input docutils container">
1305+
<div class="highlight-ipython3 notranslate"><div class="highlight"><pre><span></span><span class="o">!</span>pip<span class="w"> </span>install<span class="w"> </span><span class="s2">&quot;pyspark[sql]&quot;</span>
1306+
</pre></div>
1307+
</div>
1308+
</div>
1309+
</details>
1310+
</div>
1311+
<div class="cell tag_hide-cell docutils container">
1312+
<details class="hide above-input">
1313+
<summary aria-label="Toggle hidden content">
1314+
<span class="collapsed">Show code cell content</span>
1315+
<span class="expanded">Hide code cell content</span>
1316+
</summary>
1317+
<div class="cell_input docutils container">
1318+
<div class="highlight-ipython3 notranslate"><div class="highlight"><pre><span></span><span class="kn">from</span> <span class="nn">pyspark.sql</span> <span class="kn">import</span> <span class="n">SparkSession</span>
1319+
<span class="kn">from</span> <span class="nn">pyspark.sql.functions</span> <span class="kn">import</span> <span class="n">udf</span><span class="p">,</span> <span class="n">col</span>
1320+
<span class="kn">from</span> <span class="nn">pyspark.sql.types</span> <span class="kn">import</span> <span class="n">StringType</span>
1321+
1322+
<span class="c1"># Create SparkSession</span>
1323+
<span class="n">spark</span> <span class="o">=</span> <span class="n">SparkSession</span><span class="o">.</span><span class="n">builder</span><span class="o">.</span><span class="n">getOrCreate</span><span class="p">()</span>
1324+
</pre></div>
1325+
</div>
1326+
</div>
1327+
</details>
1328+
</div>
1329+
<p>Duplicated code in SQL queries can lead to inconsistencies if changes are made to one instance of the duplicated code but not to others.</p>
1330+
<div class="cell docutils container">
1331+
<div class="cell_input docutils container">
1332+
<div class="highlight-ipython3 notranslate"><div class="highlight"><pre><span></span><span class="c1"># Sample DataFrame</span>
1333+
<span class="n">df</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">createDataFrame</span><span class="p">(</span>
1334+
<span class="p">[(</span><span class="s2">&quot;Product 1&quot;</span><span class="p">,</span> <span class="mf">10.0</span><span class="p">,</span> <span class="mi">5</span><span class="p">),</span> <span class="p">(</span><span class="s2">&quot;Product 2&quot;</span><span class="p">,</span> <span class="mf">15.0</span><span class="p">,</span> <span class="mi">3</span><span class="p">),</span> <span class="p">(</span><span class="s2">&quot;Product 3&quot;</span><span class="p">,</span> <span class="mf">8.0</span><span class="p">,</span> <span class="mi">2</span><span class="p">)],</span>
1335+
<span class="p">[</span><span class="s2">&quot;name&quot;</span><span class="p">,</span> <span class="s2">&quot;price&quot;</span><span class="p">,</span> <span class="s2">&quot;quantity&quot;</span><span class="p">],</span>
1336+
<span class="p">)</span>
1337+
1338+
<span class="c1"># Use df within Spark SQL queries</span>
1339+
<span class="n">df</span><span class="o">.</span><span class="n">createOrReplaceTempView</span><span class="p">(</span><span class="s2">&quot;products&quot;</span><span class="p">)</span>
1340+
1341+
<span class="c1"># Select Statement 1</span>
1342+
<span class="n">result1</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="p">(</span>
1343+
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
1344+
<span class="sd"> SELECT name, price, quantity,</span>
1345+
<span class="sd"> CASE</span>
1346+
<span class="sd"> WHEN price &lt; 10.0 THEN &#39;Low&#39;</span>
1347+
<span class="sd"> WHEN price &gt;= 10.0 AND price &lt; 15.0 THEN &#39;Medium&#39;</span>
1348+
<span class="sd"> ELSE &#39;High&#39;</span>
1349+
<span class="sd"> END AS category</span>
1350+
<span class="sd"> FROM products</span>
1351+
<span class="sd">&quot;&quot;&quot;</span>
1352+
<span class="p">)</span>
1353+
1354+
<span class="c1"># Select Statement 2</span>
1355+
<span class="n">result2</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="p">(</span>
1356+
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
1357+
<span class="sd"> SELECT name,</span>
1358+
<span class="sd"> CASE</span>
1359+
<span class="sd"> WHEN price &lt; 10.0 THEN &#39;Low&#39;</span>
1360+
<span class="sd"> WHEN price &gt;= 10.0 AND price &lt; 15.0 THEN &#39;Medium&#39;</span>
1361+
<span class="sd"> ELSE &#39;High&#39;</span>
1362+
<span class="sd"> END AS category</span>
1363+
<span class="sd"> FROM products</span>
1364+
<span class="sd"> WHERE quantity &gt; 3</span>
1365+
<span class="sd">&quot;&quot;&quot;</span>
1366+
<span class="p">)</span>
1367+
1368+
<span class="c1"># Display the results</span>
1369+
<span class="n">result1</span><span class="o">.</span><span class="n">show</span><span class="p">()</span>
1370+
<span class="n">result2</span><span class="o">.</span><span class="n">show</span><span class="p">()</span>
1371+
</pre></div>
1372+
</div>
1373+
</div>
1374+
<div class="cell_output docutils container">
1375+
<div class="output stream highlight-myst-ansi notranslate"><div class="highlight"><pre><span></span>+---------+-----+--------+--------+
1376+
| name|price|quantity|category|
1377+
+---------+-----+--------+--------+
1378+
|Product 1| 10.0| 5| Medium|
1379+
|Product 2| 15.0| 3| High|
1380+
|Product 3| 8.0| 2| Low|
1381+
+---------+-----+--------+--------+
1382+
1383+
+---------+--------+
1384+
| name|category|
1385+
+---------+--------+
1386+
|Product 1| Medium|
1387+
+---------+--------+
1388+
</pre></div>
1389+
</div>
1390+
</div>
1391+
</div>
1392+
<p>Spark UDFs (User-Defined Functions) can help address these issues by encapsulating complex logic that is reused across multiple SQL queries.</p>
1393+
<p>In the code example above, we define a UDF <code class="docutils literal notranslate"><span class="pre">assign_category_label</span></code> that assigns category labels based on price. This UDF is then reused in two different SQL statements.</p>
1394+
<div class="cell docutils container">
1395+
<div class="cell_input docutils container">
1396+
<div class="highlight-ipython3 notranslate"><div class="highlight"><pre><span></span><span class="c1"># Define UDF to assign category label based on price</span>
1397+
<span class="nd">@udf</span><span class="p">(</span><span class="n">returnType</span><span class="o">=</span><span class="n">StringType</span><span class="p">())</span>
1398+
<span class="k">def</span> <span class="nf">assign_category_label</span><span class="p">(</span><span class="n">price</span><span class="p">):</span>
1399+
<span class="k">if</span> <span class="n">price</span> <span class="o">&lt;</span> <span class="mf">10.0</span><span class="p">:</span>
1400+
<span class="k">return</span> <span class="s2">&quot;Low&quot;</span>
1401+
<span class="k">elif</span> <span class="n">price</span> <span class="o">&gt;=</span> <span class="mf">10.0</span> <span class="ow">and</span> <span class="n">price</span> <span class="o">&lt;</span> <span class="mf">15.0</span><span class="p">:</span>
1402+
<span class="k">return</span> <span class="s2">&quot;Medium&quot;</span>
1403+
<span class="k">else</span><span class="p">:</span>
1404+
<span class="k">return</span> <span class="s2">&quot;High&quot;</span>
1405+
1406+
1407+
<span class="c1"># Register UDF</span>
1408+
<span class="n">spark</span><span class="o">.</span><span class="n">udf</span><span class="o">.</span><span class="n">register</span><span class="p">(</span><span class="s2">&quot;assign_category_label&quot;</span><span class="p">,</span> <span class="n">assign_category_label</span><span class="p">)</span>
1409+
1410+
<span class="c1"># Select Statement 1</span>
1411+
<span class="n">result1</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="p">(</span>
1412+
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
1413+
<span class="sd"> SELECT name, price, quantity, assign_category_label(price) AS category</span>
1414+
<span class="sd"> FROM products</span>
1415+
<span class="sd">&quot;&quot;&quot;</span>
1416+
<span class="p">)</span>
1417+
1418+
<span class="c1"># Select Statement 2</span>
1419+
<span class="n">result2</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="n">sql</span><span class="p">(</span>
1420+
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
1421+
<span class="sd"> SELECT name, assign_category_label(price) AS category</span>
1422+
<span class="sd"> FROM products</span>
1423+
<span class="sd"> WHERE quantity &gt; 3</span>
1424+
<span class="sd">&quot;&quot;&quot;</span>
1425+
<span class="p">)</span>
1426+
1427+
<span class="c1"># Display the results</span>
1428+
<span class="n">result1</span><span class="o">.</span><span class="n">show</span><span class="p">()</span>
1429+
<span class="n">result2</span><span class="o">.</span><span class="n">show</span><span class="p">()</span>
1430+
</pre></div>
1431+
</div>
1432+
</div>
1433+
<div class="cell_output docutils container">
1434+
<div class="output stderr highlight-myst-ansi notranslate"><div class="highlight"><pre><span></span>24/04/15 09:28:11 WARN SimpleFunctionRegistry: The function assign_category_label replaced a previously registered function.
1435+
</pre></div>
1436+
</div>
1437+
<div class="output stream highlight-myst-ansi notranslate"><div class="highlight"><pre><span></span>+---------+-----+--------+--------+
1438+
| name|price|quantity|category|
1439+
+---------+-----+--------+--------+
1440+
|Product 1| 10.0| 5| Medium|
1441+
|Product 2| 15.0| 3| High|
1442+
|Product 3| 8.0| 2| Low|
1443+
+---------+-----+--------+--------+
1444+
1445+
+---------+--------+
1446+
| name|category|
1447+
+---------+--------+
1448+
|Product 1| Medium|
1449+
+---------+--------+
1450+
</pre></div>
1451+
</div>
1452+
</div>
1453+
</div>
1454+
</section>
12951455
</section>
12961456

12971457
<script type="text/x-thebe-config">
@@ -1362,6 +1522,7 @@ <h2><span class="section-number">6.15.5. </span>Simplify Complex SQL Queries wit
13621522
<li class="toc-h2 nav-item toc-entry"><a class="reference internal nav-link" href="#pyspark-sql-enhancing-reusability-with-parameterized-queries">6.15.3. PySpark SQL: Enhancing Reusability with Parameterized Queries</a></li>
13631523
<li class="toc-h2 nav-item toc-entry"><a class="reference internal nav-link" href="#working-with-arrays-made-easier-in-spark-3-5">6.15.4. Working with Arrays Made Easier in Spark 3.5</a></li>
13641524
<li class="toc-h2 nav-item toc-entry"><a class="reference internal nav-link" href="#simplify-complex-sql-queries-with-pyspark-udfs">6.15.5. Simplify Complex SQL Queries with PySpark UDFs</a></li>
1525+
<li class="toc-h2 nav-item toc-entry"><a class="reference internal nav-link" href="#leverage-spark-udfs-for-reusable-complex-logic-in-sql-queries">6.15.6. Leverage Spark UDFs for Reusable Complex Logic in SQL Queries</a></li>
13651526
</ul>
13661527
</nav></div>
13671528

0 commit comments

Comments
 (0)