From Pandas to Pyspark
Recently converted a Python script that relied on Pandas DataFrames to utilize PySpark DataFrames instead. The main goal is to transition data manipulation from the localized context of Pandas to the distributed processing capabilities offered by PySpark. This shift to PySpark DataFrames enables us to enhance scalability and efficiency by harnessing the power of distributed computing.
Pandas stands as the indispensable library for data scientists, serving as a crucial tool for individuals seeking to manipulate and analyze data effectively.
Nevertheless, while Pandas proves its usefulness and encompasses a wide range of functionalities, its limitations become evident when working with sizable datasets. To overcome this obstacle, a transition to PySpark becomes imperative as it enables distributed computing across multiple machines—an advantage Pandas lacks. This article aims to simplify the process for newcomers to PySpark by offering code snippets that provide equivalents to various Pandas methods. With these readily available examples, navigating PySpark will be a smoother experience for aspiring practitioners.
Getting started
To establish a solid foundation for our future discussions, it is imperative to start by importing the necessary libraries. This initial step lays the groundwork for further exploration and implementation.
import pandas as pd import pyspark.sql.functions as F
The entry point into PySpark functionalities is the SparkSession class. Through the SparkSession instance, you can create data frames, apply all kinds of transformations, read and write files, etc.… To define a SparkSession, you can use the following :
from pyspark.sql import SparkSession spark = SparkSession\ .builder\ .appName('SparkByExamples.com')\ .getOrCreate()
With all the preparations in place, we can now delve into the exciting comparison between Pandas and PySpark. Let’s explore the key differences and corresponding counterparts between these two powerful tools.
DataFrame creation
First, let’s define a data sample we’ll be using:
columns = ["employee","department","state","salary","age"] data = [("Alain","Sales","Paris",60000,34), ("Ahmed","Sales","Lyon",80000,45), ("Ines","Sales","Nice",55000,30), ("Fatima","Finance","Paris",90000,28), ("Marie","Finance","Nantes",100000,40)]
To create a Pandas DataFrame , we can use the following:
df = pd.DataFrame(data=data, columns=columns) # Show a few lines df.head(2)
PySpark
df = spark.createDataFrame(data).toDF(*columns) # Show a few lines df.limit(2).show()
Specifying columns types
Pandas
types_dict = { "employee": pd.Series([r[0] for r in data], dtype='str'), "department": pd.Series([r[1] for r in data], dtype='str'), "state": pd.Series([r[2] for r in data], dtype='str'), "salary": pd.Series([r[3] for r in data], dtype='int'), "age": pd.Series([r[4] for r in data], dtype='int') } df = pd.DataFrame(types_dict)
You can check your types by executing this line:
df.dtypes
PySpark
from pyspark.sql.types import StructType,StructField, StringType, IntegerType schema = StructType([ \ StructField("employee",StringType(),True), \ StructField("department",StringType(),True), \ StructField("state",StringType(),True), \ StructField("salary", IntegerType(), True), \ StructField("age", IntegerType(), True) \ ]) df = spark.createDataFrame(data=data,schema=schema)
You can check your DataFrame’s schema by executing :
df.dtypes # OR df.printSchema()
Reading and writing files
Reading and writing are so similar in Pandas and PySpark. The syntax is the following for each:
Pandas
df = pd.read_csv(path, sep=';', header=True) df.to_csv(path, ';', index=False)
PySpark
df = spark.read.csv(path, sep=';') df.coalesce(n).write.mode('overwrite').csv(path, sep=';')
Add a column
Pandas
score = [1, 2, 5, 7, 4] df[score] = score
Pyspark
score = [1, 2, 5, 7, 4] df.withColumn('score', F.lit(score))
Filtering
Selecting certain columns in Pandas is done like below:
Pandas
columns_subset = ['employee', 'salary'] df[columns_subset].head() df.loc[:, columns_subset].head()
Whereas in PySpark, we need to use the select method with a list of columns:
PySpark
df.select(['employee', 'salary']).show(5)
Concatenate dataframes
Pandas
df = pd.concat([df1, df2], ignore_index = True)
Pyspark
df = df1.union(df2)
Aggregations
To perform some aggregations, the syntax is almost similar between Pandas and PySpark:
Pandas
df.groupby('department').agg({'employee': 'count', 'salary':'max', 'age':'mean'})
PySpark
df.groupBy('department').agg({'employee': 'count', 'salary':'max', 'age':'mean'})
However, the results need some tweaking to be similar in pandas and PySpark.
In pandas, the column to group by becomes the index. To get it back as a column, we need to apply the reset_index method:
Pandas
df.groupby('department').agg({'employee': 'count', 'salary':'max', 'age':'mean'}).reset_index()
In PySpark, the names of the columns get modified to reflect the performed aggregation in the resulting data frame. If you wish to avoid this, you’ll need to use the alias method.
Apply a transformation over a column
Pandas
df['new_score'] = df['score'].apply(lambda x: x*5)
Pyspark
from pyspark.sql.types import FloatType df.withColumn('new_score', F.udf(lambda x: x*5, FloatType())('score'))
Conclusion
In conclusion, the striking similarities in syntax between Pandas and PySpark will greatly facilitate the transition from one framework to the other. This similarity enables a smooth and seamless migration between the two, easing the learning curve and minimizing the challenges associated with adapting to a new environment.
Utilizing PySpark offers a significant advantage when dealing with large datasets due to its capability for parallel computing. However, if the dataset being handled is small, it becomes more efficient to switch back to using the versatile and widely used Pandas library. In such cases, leveraging Pandas allows quicker and more streamlined data processing, making it the preferred choice. Please refer to our blogs for more insightful content and comment if you have any questions regarding the topic.