Parameter NameDefaultDescription
compute_fn-The main computation function to be transformed.
name-Name of the transform function. If not provided, uses the function name.
description-Description of the transform function for documentation purposes.
materializedFalseIf True, the output will be persisted/cached for reuse.
input_mapping-Dictionary defining input dependencies and their sources. Maps input parameter names to their corresponding datasets or transforms.
depends-List of transform functions that must complete before this transform can run.
partition_by-List of column names to partition the output data by.
output_mapping-Dictionary defining how the output should be mapped or stored.
tags-List of tags for categorizing and organizing transforms.

Basic Transform Example


from datazone import transform

@transform(name="say_hello", description="Prints 'Hello, World!'")
def say_hello():
    print("Hello, World!")

Materialization

Materialization is the process of storing the output of a transform function for reuse. This can be useful when a transform function is computationally expensive and its output is used multiple times in the pipeline.

Materialization is allowed for PySpark DataFrames only. So the output of the transform function should be a PySpark DataFrame.
from datazone import transform

@transform(name="fetch_data", materialized=True)
def fetch_data(context):
    data = [("Alice", 9), ("Bob", 6), ("Charlie", 3), ("Maria", 7)]
    columns = ["Name", "Score"]

    spark = context.resources["pyspark"].spark
    return spark.createDataFrame(data, columns)
In above code, we used context.resources["pyspark"].spark to access the PySpark session. For more information, check the Context section.

After running the pipeline, the output of the fetch_data function will be stored in Datazone as a dataset. You can check the dataset alias in the Datazone UI or use the datazone dataset list command to list all datasets.

Input Mapping

You can define input mappings to specify the data sources and dependencies for your transform functions. Input mappings enable you to:

  • Chain multiple transform functions
  • Create directed acyclic graphs (DAGs)
  • Connect to different data sources
  • Apply data transformations sequentially
Parameter NameDefaultDescription
entity-The entity to be used as input. It can be a dataset or another transform function.
output_name-If you use a transform that has multiple outputs, you can specify the output name.

Here are the common input mapping patterns:

from datazone import transform

@transform(input_mapping={"data": Input(Dataset(alias="raw_data"))})
def clean_data(data):
    return data.filter(data["column"] > 0)


@transform(
    input_mapping={
        "clean_data": Input(clean_data),
        "another_data": Input(Dataset(alias="another_data")),
    }
)
def aggregate_data(clean_data, another_data):
    return (
        clean_data.join(another_data, "column")
        .groupBy("column")
        .agg({"column": "sum"})
        .select("column", "sum(column)")
    )
  • Input mappings should be defined as a dictionary where the key is the input parameter name and the value is an instance of the Input class.
  • The Input class accepts a Dataset or another transform function as an argument.

Output Mapping

Output mapping defines how the output of a transform function should be stored or mapped. You can specify the output mapping using the output_mapping parameter.

Parameter NameDefaultDescription
dataset-The dataset where the output should be stored.
materializedFalseIf True, the output will be stored as a materialized dataset.
partition_by-List of column names to partition the output data by.
modeoverwriteThe write mode for the output data. Options are overwrite, append.
from datazone import transform, Output, Dataset

@transform(output_mapping={dataset: Output(dataset=Dataset(alias="clean_data"))})
def clean_data(data):
    return data.filter(data["column"] > 0)
  • Output mapping should be defined as a dictionary where the key is the output parameter name and the value is an instance of the Output class.

Multiple Outputs

@transform(
    input_mapping={"orders": Input(Dataset(alias="orders"))},
    output_mapping={
        "daily_sales": Output(materialized=True),
        "monthly_sales": Output(materialized=True),
        "yearly_sales": Output(
            partition_by=["year"],
            materialized=True
        )
    }
)
def process_sales(orders):
    daily = orders.groupBy("date").agg(...)
    monthly = orders.groupBy("year", "month").agg(...)
    yearly = orders.groupBy("year").agg(...)

    return daily, monthly, yearly

Partitioning

Partitioning helps organize and optimize your datasets in our data platform. When you create a transform function, you can specify partition columns using the partition_by parameter.

@transform(
    partition_by=["date", "country"],
    materialized=True
)
def sales_by_country(data):
    return data.filter(...)

Why Partition?

  • Improve query performance when filtering by partition columns
  • Efficiently manage large datasets
  • Enable data retention policies by date partitions

Common Partition Strategies

# Daily partitioning for time-series data
@transform(partition_by=["date"])

# Geographic partitioning
@transform(partition_by=["country", "region"])

# Multiple partition columns
@transform(partition_by=["date", "customer_type"])

All transformed datasets are stored in Delta Lake format. Choose partition columns based on your most common filtering needs, typically date-based or categorical columns with reasonable cardinality. If you partition by a high-cardinality column, it may lead to a large number of small files, which can impact query performance.

Best Practices

  • Use date partitioning for time-series data
  • Avoid partitioning by columns with high cardinality
  • Consider your query patterns when choosing partition columns