Welcome to this focused guide on utilizing PySpark within the Datazone platform. This document is tailored to illustrate how PySpark can be seamlessly integrated into Datazone Transforms, enabling efficient data processing and transformation.

Using three key datasets - orders, SKUs, and customers - as our foundation, we will explore various PySpark operations. Each example is designed to demonstrate practical applications in data transformation, showcasing the versatility and power of PySpark in a Datazone environment.

From basic operations like projection and filtering to more advanced techniques such as joins, unions, and aggregations, each section of this guide offers concise yet comprehensive insights into PySpark’s capabilities. Whether you’re a beginner or an experienced user, these examples provide a clear pathway to enhance your data processing workflows in Datazone using PySpark.

In the upcoming examples, the terms dataset and dataframe are used interchangeably. This means that whenever either term is mentioned, it refers to the same concept of a structured collection of data within our context.

Please note that the usage of Dataset(id="<dataset_orders_id>") in the provided code examples serves merely as an illustrative placeholder. It is important to replace the string <dataset_orders_id> with the actual identifier of your specific orders dataset when implementing these examples in your environment. This ensures that the code correctly references and interacts with your dataset.

Projection

Projection in PySpark is used to select specific columns from a DataFrame. This operation is similar to the SELECT statement in SQL. It is useful when you want to work with only a subset of columns in your dataset.

Example Use Case: Selecting only the order_id and order_date columns from an orders DataFrame.

filename="transform.py" copy
from datazone import transform, Input, Dataset

@transform(input_mapping={"orders": Input(Dataset(id="dataset_orders_id"))})
def select_orders(orders):
    return orders.select("order_id", "order_date")

Filter

The filter operation is used to retrieve rows from a DataFrame that meet a specific condition. This is akin to the WHERE clause in SQL. It allows for both simple and complex filtering criteria.

Example Use Case: Fetching orders made after a certain date.

Filtering records based on a condition.

filename="transform.py" copy
from datazone import transform, Input, Dataset

@transform(input_mapping={"orders": Input(Dataset(id="dataset_orders_id"))})
def filter_orders(orders):
    return orders.filter(orders.order_date > "2023-01-01")

Filtering using conditions on multiple columns.

filename="transform.py" copy
from datazone import transform, Input, Dataset

@transform(input_mapping={"orders": Input(Dataset(id="dataset_orders_id"))})
def filter_orders_multi(orders):
    return orders.filter((orders.order_date > "2023-01-01") & (orders.customer_id == 102))

Column Rename

Column renaming is used to change the name of a column in a DataFrame. This is particularly useful for improving readability or when column names need to conform to certain naming conventions.

Example Use Case: Renaming order_date to date_of_order for clarity.

filename="transform.py" copy
from datazone import transform, Input, Dataset

@transform(input_mapping={"orders": Input(Dataset(id="dataset_orders_id"))})
def rename_order_column(orders):
    return orders.withColumnRenamed("order_date", "date_of_order")

On-the-fly Columns

Creating on-the-fly columns involves adding new columns to a DataFrame, often with calculated or static values. This is useful for adding derived metrics or flags to your data.

Example Use Case: Adding a new column status to an orders DataFrame to indicate processing status.

filename="transform.py" copy
from datazone import transform, Input, Dataset
from pyspark.sql.functions import lit

@transform(input_mapping={"orders": Input(Dataset(id="dataset_orders_id"))})
def add_new_column(orders):
    return orders.withColumn("status", lit("processed"))


Sorting

Sorting refers to arranging data in a specified order. In PySpark, the orderBy function is used to sort the DataFrame based on one or more columns, either in ascending or descending order.

Detailed Usage:

  • orderBy("column"): Sorts the DataFrame in ascending order based on the specified column.
  • orderBy("column", ascending=False): Sorts in descending order.
filename="transform.py" copy
from datazone import transform, Input, Dataset
from pyspark.sql.functions import lit

@transform(input_mapping={"orders": Input(Dataset(id="dataset_orders_id"))})
def sort_orders_by_date(orders):
    return orders.orderBy("order_date")

Joins

Join operations are used to combine two DataFrames based on a common key or condition. This is similar to joins in SQL and is essential for merging related datasets.

Types of Joins:

  • Inner Join: Returns rows that have matching values in both DataFrames.
  • Left/Right Outer Join: Returns all rows from the left/right DataFrame and matched rows from the other DataFrame.
  • Full Outer Join: Returns all rows when there is a match in one of the DataFrames.
  • Anti Join: Returns rows from the left DataFrame that do not have matching keys in the right DataFrame.

Example Use Case:

  • Inner Join: Find customers who have placed orders (common in both datasets).
  • Left Outer Join: Find all customers and their order details, if any.
  • Right Outer Join: Find all orders and their customer details, if any.
  • Anti Join: Find customers who have not placed any orders.
filename="transform.py" copy
from datazone import transform, Input, Dataset

@transform(input_mapping={"customers": Input(Dataset(id="dataset_customers_id")),
                          "orders": Input(Dataset(id="dataset_orders_id"))})
def perform_various_joins(customers, orders):
    # Inner Join: Customers with orders
    inner_join_df = customers.join(orders, customers.customer_id == orders.customer_id, "inner")

    # Left Outer Join: All customers, with order details if available
    left_outer_join_df = customers.join(orders, customers.customer_id == orders.customer_id, "left_outer")

    # Right Outer Join: All orders, with customer details if available
    right_outer_join_df = customers.join(orders, customers.customer_id == orders.customer_id, "right_outer")

    # Anti Join: Customers without orders
    anti_join_df = customers.join(orders, customers.customer_id == orders.customer_id, "left_anti")

    return inner_join_df, left_outer_join_df, right_outer_join_df, anti_join_df

Union

The union operation is used to combine two DataFrames with the same schema (i.e., number and type of columns) by appending the rows of one DataFrame to another.

Example Use Case: Merging two datasets of orders from different sources or time periods into a single DataFrame.

filename="transform.py" copy
from datazone import transform, Input, Dataset

@transform(input_mapping={"orders1": Input(Dataset(id="dataset_orders_id1")),
                          "orders2": Input(Dataset(id="dataset_orders_id2"))})
def union_orders(orders1, orders2):
    return orders1.union(orders2)


Aggregation

Aggregation operations are used to compute summary statistics or other complex aggregations on a DataFrame. These operations often go hand in hand with group by functionality.

Example Use Case: Calculating total sales per product or average order value.

filename="transform.py" copy
from datazone import transform, Input, Dataset
from pyspark.sql import functions as F

@transform(input_mapping={"orders": Input(Dataset(id="dataset_orders_id"))})
def aggregate_total_orders_per_customer(orders):
    return orders.groupBy("customer_id").agg(F.count("order_id").alias("total_orders"))

In this code snippet:

  • groupBy("customer_id"): Groups the data by the customer_id column.
  • agg(...): Performs the specified aggregation function, which is counting the number of order_id for each group.
  • alias("total_orders"): Renames the result of the aggregation to total_orders for clarity.

Pivot

Pivoting is used to rotate data from a long format to a wide format. It can summarize data and is useful in data reshaping and analysis.

Detailed Usage:

  • groupBy("column").pivot("pivot_column"): Groups the data by a column and pivots on another column.
  • Functions like sum(), count(), etc., can be applied to the pivoted data for aggregation.
filename="transform.py" copy
from datazone import transform, Input, Dataset
from pyspark.sql import functions as F

@transform(input_mapping={"sales": Input(Dataset(id="dataset_sales_id"))})
def pivot_sales_data(sales):
    return sales.groupBy("year").pivot("category").sum("sales")

Window Functions

Window functions are used for performing calculations across a set of rows that are somehow related to the current row. This is useful for running totals, moving averages, ranking, etc.

Detailed Usage:

  • Define a Window specification using Window.partitionBy("column").orderBy("other_column").
  • Apply window functions like rank(), row_number(), lead(), lag(), etc.
filename="transform.py" copy
from datazone import transform, Input, Dataset
from pyspark.sql import functions as F
from pyspark.sql.window import Window

@transform(input_mapping={"orders": Input(Dataset(id="dataset_orders_id"))})
def rank_orders(orders):
    windowSpec = Window.partitionBy("customer_id").orderBy("order_date")
    return orders.withColumn("rank", F.rank().over(windowSpec))



UDFs (User Defined Functions)

UDFs allow you to extend the functionality of PySpark by defining custom functions in Python. These functions can then be used in DataFrame transformations.

Detailed Usage:

  1. Define a Python function.
  2. Register it as a UDF.
  3. Apply the UDF to a DataFrame column.
filename="transform.py" copy
from datazone import transform, Input, Dataset
from pyspark.sql import functions as F

@transform(input_mapping={"orders": Input(Dataset(id="dataset_orders_id"))})
def apply_discount(orders):
    calculate_discount_udf = F.udf(lambda price: price * 0.9, FloatType())
    return orders.withColumn("discounted_price", calculate_discount_udf(F.col("price")))


Handling Missing Data

Dealing with null or missing data is a common task. PySpark provides functions to drop, fill, or replace these missing values.

Detailed Usage:

  • fillna(): Fills null values with specified value(s).
  • dropna(): Drops rows with null values.
filename="transform.py" copy
from datazone import transform, Input, Dataset
from pyspark.sql import functions as F

@transform(input_mapping={"orders": Input(Dataset(id="dataset_orders_id"))})
def fill_missing_prices(orders):
    return orders.fillna({"price": 0})


A comprehensive Transform Example

To demonstrate a scenario where multiple PySpark operations are used within a single transform block, let’s consider a hypothetical analysis on an orders dataset. In this scenario, we will:

  1. Filter the orders for a specific year.
  2. Sort these orders by order date.
  3. Add a new column indicating the order size category.
  4. Calculate the total value of orders for each customer.
  5. Finally, pivot this data to show the sum of order values per month for each customer.

Here is how you can implement this using the transform decorator and datazone module:

filename="transform.py" copy
from datazone import transform, Input, Dataset
from pyspark.sql import functions as F
from pyspark.sql.types import StringType

@transform(input_mapping={"orders": Input(Dataset(id="dataset_orders_id"))})
def comprehensive_orders_analysis(orders):
    # Filter orders from a specific year
    filtered_orders = orders.filter(F.year("order_date") == 2023)

    # Sort by order date
    sorted_orders = filtered_orders.orderBy("order_date")

    # Define a UDF to categorize order size
    def order_size_category(order_value):
        if order_value < 100:
            return "Small"
        elif order_value <= 500:
            return "Medium"
        else:
            return "Large"

    order_size_udf = F.udf(order_size_category, StringType())
    categorized_orders = sorted_orders.withColumn("order_size", order_size_udf(F.col("order_value")))

    # Aggregate total value of orders for each customer
    total_value_per_customer = categorized_orders.groupBy("customer_id").agg(F.sum("order_value").alias("total_value"))

    # Pivot data to show sum of order values per month for each customer
    pivoted_data = total_value_per_customer.groupBy("customer_id").pivot("month").sum("total_value")

    return pivoted_data

Explanation of the Code:

  • Filtering: Only includes orders from the year 2023.
  • Sorting: Orders are sorted by their date.
  • UDF Application: A user-defined function categorizes each order based on its value.
  • Aggregation: The total value of orders is computed for each customer.
  • Pivoting: The data is then pivoted to show the sum of order values per month for each customer.

Note:

  • Ensure that dataset_orders_id is replaced with the actual dataset ID in your implementation.
  • The code assumes the presence of columns like order_date, order_value, etc., in your orders DataFrame.
  • UDFs might have performance implications; consider using built-in functions wherever possible.