Using spark clusters to build efficient data pipelines + Code
PySpark, a core skill requirement for data science positions. The popularity of using PySpark has risen with the need to process big data, increased adaptation of cluster/parallel computing capabilities.
Spark is a powerful framework to interact with clusters to direct parallel computing. Since majority of data science professionals are familiar with Python and SQL, PySpark is a solution that brings pythonic
implentation with syntax looks like SQL
.
PySpark is a python wrapper around Spark using Python API to interact.
Note: To use PySpark, you would need spark cluster is configured in your local or databricks.
- In mac with M1 chip, first you need to install
apache-spark
using home brewbrew install apache-spark
. This makes spark instance at/opt/homebrew/Cellar/apache-spark/3.3.1/bin/
. I use PyCharm IDE, below screenshot shows those configs.
2. When I build production ready data pipelines, I use databricks
. Databricks gives you scalable computation to use and monitor spark clusters and automate ETL jobs using PySpark.
Initialize a Spark Session
Before initializing you need to install pyspark
library using pip install pyspark
and load using import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import col
spark = SparkSession.builder.appName("My_App_Name").getOrCreate()
In the above code, I named my session app name My_App_Name
, you may change it to any name you prefer to create a Spark Session.
Read Parquet/CSV file
Parquet is a column oriented data storage format native to HDFS (Hadoop Distributed File Systems). Using parquet files would give compute efficiency in case of processing big datasets.
CSV is a simple Comma Separated Values format, used frequently why processing data in Excel or spreadsheets.
Cloud processing costs depend on data storage, query run time and data scanned. Parquet format is the efficient way to store and process big datasets. Using below code, you can import a CSV spreadsheet or a parquet file from cloud storage.
# Read a CSV File
df = (spark.read
.format("csv")
.option("delimiter", ",")
.option("header", "true")
.option("InferSchema", "true")
.load("data_table.csv")
)
# Read a Parquet File
df = (spark.read
.format("parquet")
.option("delimiter", ",")
.option("header", "true")
.option("InferSchema", "true")
.load("data_table.parquet")
)
# 0r
df = spark.read.parquet("data_table.parquet")
Select, Alias, Filter, GroupBy, Sort
Most common functions in Exploratory Data Analysis (EDA), involves selecting variable of interest, filtering using conditions, grouping into categories and sorting for quick understanding.
Below code shows how to combine all these operations
df = (spark.read.parquet("data_table.parquet")
.select(F.col("title").alias("Title"), "No of pages")
.where(F.col("Title") == "Calculus")
.groupby("Title")
.count()
.orderby(F.col('No of pages'), ascending = False)
Joins (left, right, inner, outer, self)
In a database, we store multiple datasets. Joins play a key role in unlocking insights by joining datasets on primary keys / common variable.
Even though left and right join or more common, it is also important to know how to perform inner, outer and unions.
df = ((df(DF.select("Title", "No of Pages", "Ratings", "Author"),
on="Title",
how="left",
)).join(ratings, on="Title", how="right")
.join(authors, on="Title", how="left"))
Write Parquet/CSV file
Once the data processing is done, you can write your spark data frame as a parquet file or CSV file.
In case you are want to share your data frame to clients who use spreadsheets CSV is the best. If you are planning to add data to a database and also available to access through HDFS ‘write’ the data frame in parquet format.
(df .write
.mode("overwrite")
.option("overwriteSchema", "true")
.option("path", "/FileStore/tables/Books_DF.parquet")
.saveAsTable("Database_Name.Books_DF"))
What's Next with PySpark
PySpark is a powerful library to achieve performance gains and ability to process large datasets. The functions I shared in the blog are the basics, there is so much more you can do with PySpark.
In my coming Blog PySpark for Data Science (Part 2), I will cover advanced functions of PySpark that might interest you including building customizing schema, statistical analysis, data pipeline jobs and MLflow etc.