Pyspark Examples in Datazone Transforms
Welcome to this focused guide on utilizing PySpark within the Datazone platform. This document is tailored to illustrate how PySpark can be seamlessly integrated into Datazone Transforms, enabling efficient data processing and transformation.
Using three key datasets - orders
, SKUs
, and customers
- as our foundation, we will explore various PySpark operations. Each example is designed to demonstrate practical applications in data transformation, showcasing the versatility and power of PySpark in a Datazone environment.
From basic operations like projection and filtering to more advanced techniques such as joins, unions, and aggregations, each section of this guide offers concise yet comprehensive insights into PySpark's capabilities. Whether you're a beginner or an experienced user, these examples provide a clear pathway to enhance your data processing workflows in Datazone using PySpark.
In the upcoming examples, the terms dataset
and dataframe
are used interchangeably. This means that whenever either term is mentioned, it refers to the same concept of a structured collection of data within our context.
Please note that the usage of Dataset(id="<dataset_orders_id>")
in the provided code examples serves merely as an illustrative placeholder. It is important to replace the string <dataset_orders_id>
with the actual identifier of your specific orders dataset when implementing these examples in your environment. This ensures that the code correctly references and interacts with your dataset.
Projection
Projection in PySpark is used to select specific columns from a DataFrame. This operation is similar to the SELECT
statement in SQL. It is useful when you want to work with only a subset of columns in your dataset.
Example Use Case: Selecting only the order_id
and order_date
columns from an orders DataFrame.
from datazone import transform, Input, Dataset
@transform(input_mapping={"orders": Input(Dataset(id="dataset_orders_id"))})
def select_orders(orders):
return orders.select("order_id", "order_date")
Filter
The filter operation is used to retrieve rows from a DataFrame that meet a specific condition. This is akin to the WHERE
clause in SQL. It allows for both simple and complex filtering criteria.
Example Use Case: Fetching orders made after a certain date.
Filtering records based on a condition.
from datazone import transform, Input, Dataset
@transform(input_mapping={"orders": Input(Dataset(id="dataset_orders_id"))})
def filter_orders(orders):
return orders.filter(orders.order_date > "2023-01-01")
Filtering using conditions on multiple columns.
from datazone import transform, Input, Dataset
@transform(input_mapping={"orders": Input(Dataset(id="dataset_orders_id"))})
def filter_orders_multi(orders):
return orders.filter((orders.order_date > "2023-01-01") & (orders.customer_id == 102))
Column Rename
Column renaming is used to change the name of a column in a DataFrame. This is particularly useful for improving readability or when column names need to conform to certain naming conventions.
Example Use Case: Renaming order_date
to date_of_order
for clarity.
from datazone import transform, Input, Dataset
@transform(input_mapping={"orders": Input(Dataset(id="dataset_orders_id"))})
def rename_order_column(orders):
return orders.withColumnRenamed("order_date", "date_of_order")
On-the-fly Columns
Creating on-the-fly columns involves adding new columns to a DataFrame, often with calculated or static values. This is useful for adding derived metrics or flags to your data.
Example Use Case: Adding a new column status to an orders DataFrame to indicate processing status.
from datazone import transform, Input, Dataset
from pyspark.sql.functions import lit
@transform(input_mapping={"orders": Input(Dataset(id="dataset_orders_id"))})
def add_new_column(orders):
return orders.withColumn("status", lit("processed"))
Sorting
Sorting refers to arranging data in a specified order. In PySpark, the orderBy
function is used to sort the DataFrame based on one or more columns, either in ascending or descending order.
Detailed Usage:
orderBy("column")
: Sorts the DataFrame in ascending order based on the specified column.orderBy("column"
, ascending=False): Sorts in descending order.
from datazone import transform, Input, Dataset
from pyspark.sql.functions import lit
@transform(input_mapping={"orders": Input(Dataset(id="dataset_orders_id"))})
def sort_orders_by_date(orders):
return orders.orderBy("order_date")
Joins
Join operations are used to combine two DataFrames based on a common key or condition. This is similar to joins in SQL and is essential for merging related datasets.
Types of Joins:
- Inner Join: Returns rows that have matching values in both DataFrames.
- Left/Right Outer Join: Returns all rows from the left/right DataFrame and matched rows from the other DataFrame.
- Full Outer Join: Returns all rows when there is a match in one of the DataFrames.
- Anti Join: Returns rows from the left DataFrame that do not have matching keys in the right DataFrame.
Example Use Case:
- Inner Join: Find customers who have placed orders (common in both datasets).
- Left Outer Join: Find all customers and their order details, if any.
- Right Outer Join: Find all orders and their customer details, if any.
- Anti Join: Find customers who have not placed any orders.
from datazone import transform, Input, Dataset
@transform(input_mapping={"customers": Input(Dataset(id="dataset_customers_id")),
"orders": Input(Dataset(id="dataset_orders_id"))})
def perform_various_joins(customers, orders):
# Inner Join: Customers with orders
inner_join_df = customers.join(orders, customers.customer_id == orders.customer_id, "inner")
# Left Outer Join: All customers, with order details if available
left_outer_join_df = customers.join(orders, customers.customer_id == orders.customer_id, "left_outer")
# Right Outer Join: All orders, with customer details if available
right_outer_join_df = customers.join(orders, customers.customer_id == orders.customer_id, "right_outer")
# Anti Join: Customers without orders
anti_join_df = customers.join(orders, customers.customer_id == orders.customer_id, "left_anti")
return inner_join_df, left_outer_join_df, right_outer_join_df, anti_join_df
Union
The union operation is used to combine two DataFrames with the same schema (i.e., number and type of columns) by appending the rows of one DataFrame to another.
Example Use Case: Merging two datasets of orders from different sources or time periods into a single DataFrame.
from datazone import transform, Input, Dataset
@transform(input_mapping={"orders1": Input(Dataset(id="dataset_orders_id1")),
"orders2": Input(Dataset(id="dataset_orders_id2"))})
def union_orders(orders1, orders2):
return orders1.union(orders2)
Aggregation
Aggregation operations are used to compute summary statistics or other complex aggregations on a DataFrame. These operations often go hand in hand with group by functionality.
Example Use Case: Calculating total sales per product or average order value.
from datazone import transform, Input, Dataset
from pyspark.sql import functions as F
@transform(input_mapping={"orders": Input(Dataset(id="dataset_orders_id"))})
def aggregate_total_orders_per_customer(orders):
return orders.groupBy("customer_id").agg(F.count("order_id").alias("total_orders"))
In this code snippet:
groupBy("customer_id")
: Groups the data by the customer_id column.agg(...)
: Performs the specified aggregation function, which is counting the number of order_id for each group.alias("total_orders")
: Renames the result of the aggregation to total_orders for clarity.
Pivot
Pivoting is used to rotate data from a long format to a wide format. It can summarize data and is useful in data reshaping and analysis.
Detailed Usage:
groupBy("column").pivot("pivot_column")
: Groups the data by a column and pivots on another column.- Functions like
sum()
,count()
, etc., can be applied to the pivoted data for aggregation.
from datazone import transform, Input, Dataset
from pyspark.sql import functions as F
@transform(input_mapping={"sales": Input(Dataset(id="dataset_sales_id"))})
def pivot_sales_data(sales):
return sales.groupBy("year").pivot("category").sum("sales")
Window Functions
Window functions are used for performing calculations across a set of rows that are somehow related to the current row. This is useful for running totals, moving averages, ranking, etc.
Detailed Usage:
- Define a Window specification using
Window.partitionBy("column").orderBy("other_column")
. - Apply window functions like
rank()
,row_number()
,lead()
,lag()
, etc.
from datazone import transform, Input, Dataset
from pyspark.sql import functions as F
from pyspark.sql.window import Window
@transform(input_mapping={"orders": Input(Dataset(id="dataset_orders_id"))})
def rank_orders(orders):
windowSpec = Window.partitionBy("customer_id").orderBy("order_date")
return orders.withColumn("rank", F.rank().over(windowSpec))
UDFs (User Defined Functions)
UDFs allow you to extend the functionality of PySpark by defining custom functions in Python. These functions can then be used in DataFrame transformations.
Detailed Usage:
- Define a Python function.
- Register it as a
UDF
. - Apply the
UDF
to a DataFrame column.
from datazone import transform, Input, Dataset
from pyspark.sql import functions as F
@transform(input_mapping={"orders": Input(Dataset(id="dataset_orders_id"))})
def apply_discount(orders):
calculate_discount_udf = F.udf(lambda price: price * 0.9, FloatType())
return orders.withColumn("discounted_price", calculate_discount_udf(F.col("price")))
Handling Missing Data
Dealing with null or missing data is a common task. PySpark provides functions to drop, fill, or replace these missing values.
Detailed Usage:
fillna()
: Fills null values with specified value(s).dropna()
: Drops rows with null values.
from datazone import transform, Input, Dataset
from pyspark.sql import functions as F
@transform(input_mapping={"orders": Input(Dataset(id="dataset_orders_id"))})
def fill_missing_prices(orders):
return orders.fillna({"price": 0})
A comprehensive Transform Example
To demonstrate a scenario where multiple PySpark operations are used within a single transform block, let's consider a hypothetical analysis on an orders dataset. In this scenario, we will:
- Filter the orders for a specific year.
- Sort these orders by order date.
- Add a new column indicating the order size category.
- Calculate the total value of orders for each customer.
- Finally, pivot this data to show the sum of order values per month for each customer.
Here is how you can implement this using the transform decorator and datazone module:
from datazone import transform, Input, Dataset
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
@transform(input_mapping={"orders": Input(Dataset(id="dataset_orders_id"))})
def comprehensive_orders_analysis(orders):
# Filter orders from a specific year
filtered_orders = orders.filter(F.year("order_date") == 2023)
# Sort by order date
sorted_orders = filtered_orders.orderBy("order_date")
# Define a UDF to categorize order size
def order_size_category(order_value):
if order_value < 100:
return "Small"
elif order_value <= 500:
return "Medium"
else:
return "Large"
order_size_udf = F.udf(order_size_category, StringType())
categorized_orders = sorted_orders.withColumn("order_size", order_size_udf(F.col("order_value")))
# Aggregate total value of orders for each customer
total_value_per_customer = categorized_orders.groupBy("customer_id").agg(F.sum("order_value").alias("total_value"))
# Pivot data to show sum of order values per month for each customer
pivoted_data = total_value_per_customer.groupBy("customer_id").pivot("month").sum("total_value")
return pivoted_data
Explanation of the Code:
Filtering
: Only includes orders from the year 2023.Sorting
: Orders are sorted by their date.UDF Application
: A user-defined function categorizes each order based on its value.Aggregation
: The total value of orders is computed for each customer.Pivoting
: The data is then pivoted to show the sum of order values per month for each customer.
Note:
- Ensure that
dataset_orders_id
is replaced with the actual dataset ID in your implementation. - The code assumes the presence of columns like
order_date
,order_value
, etc., in your orders DataFrame. - UDFs might have performance implications; consider using built-in functions wherever possible.