> ## Documentation Index
> Fetch the complete documentation index at: https://docs.datazone.co/llms.txt
> Use this file to discover all available pages before exploring further.

# Transform

> Transform functions are the bricks of your pipeline. You can atomize your data processing steps into small functions and chain them together to build a pipeline.

| Parameter Name      | Default | Description                                                                                                                         |
| ------------------- | ------- | ----------------------------------------------------------------------------------------------------------------------------------- |
| **compute\_fn**     | -       | The main computation function to be transformed.                                                                                    |
| **name**            | -       | Name of the transform function. If not provided, uses the function name.                                                            |
| **description**     | -       | Description of the transform function for documentation purposes.                                                                   |
| **materialized**    | False   | If True, the output will be persisted/cached for reuse.                                                                             |
| **input\_mapping**  | -       | Dictionary defining input dependencies and their sources. Maps input parameter names to their corresponding datasets or transforms. |
| **depends**         | -       | List of transform functions that must complete before this transform can run.                                                       |
| **partition\_by**   | -       | List of column names to partition the output data by.                                                                               |
| **output\_mapping** | -       | Dictionary defining how the output should be mapped or stored.                                                                      |
| **tags**            | -       | List of tags for categorizing and organizing transforms.                                                                            |
| **engine**          | pyspark | The computation engine to use. Options are 'pyspark' or 'pandas'.                                                                   |

### Basic Transform Example

```python theme={null}
from datazone import transform

@transform(name="say_hello", description="Prints 'Hello, World!'")
def say_hello():
    print("Hello, World!")
```

## Transform Engines

The transform decorator supports two computation engines: PySpark and Pandas. You can specify the engine using the `engine` parameter.

```python theme={null}
from datazone import transform

@transform(
    input_mapping={"user_data": Input(Dataset(id="66280b3f49ae018b4c0c904a"))},
    engine="pandas"
)
def filter_by_age(user_data):
    adult_users = user_data[user_data.age >= 30]
    return adult_users
```

<Warning>
  When using transforms with dependencies (via `input_mapping` or `depends`), all connected transforms must use the same engine.
  For example, if a transform uses the Pandas engine, all transforms it depends on or that depend on it must also use the Pandas engine.
</Warning>

### Engine Characteristics

* **PySpark (default)**:
  * Distributed processing capabilities
  * Better for large-scale data processing
  * Supports all materialization features
* **Pandas**:
  * Better for smaller datasets
  * More intuitive Python-native syntax
  * Ideal for local development and testing

## Materialization

Materialization is the process of storing the output of a transform function for reuse.
This can be useful when a transform function is computationally expensive and its output is used multiple times in the pipeline.

<Note>Materialization is allowed for PySpark DataFrames only. So the output of the transform function should be a PySpark DataFrame.</Note>

```python theme={null}
from datazone import transform

@transform(name="fetch_data", materialized=True)
def fetch_data(context):
    data = [("Alice", 9), ("Bob", 6), ("Charlie", 3), ("Maria", 7)]
    columns = ["Name", "Score"]

    spark = context.resources["pyspark"].spark
    return spark.createDataFrame(data, columns)
```

<Info>In above code, we used `context.resources["pyspark"].spark` to access the PySpark session. For more information,
check the [Context](/reference/development/context) section.</Info>

After running the pipeline, the output of the `fetch_data` function will be stored in Datazone as a dataset.
You can check the dataset alias in the Datazone UI or use the `datazone dataset list` command to list all datasets.

## Input Mapping

You can define input mappings to specify the data sources and dependencies for your transform functions. Input mappings enable you to:

* Chain multiple transform functions
* Create directed acyclic graphs (DAGs)
* Connect to different data sources
* Apply data transformations sequentially

| Parameter Name   | Default | Description                                                                        |
| ---------------- | ------- | ---------------------------------------------------------------------------------- |
| **entity**       | -       | The entity to be used as input. It can be a dataset or another transform function. |
| **output\_name** | -       | If you use a transform that has multiple outputs, you can specify the output name. |

Here are the common input mapping patterns:

```python theme={null}
from datazone import transform

@transform(input_mapping={"data": Input(Dataset(alias="raw_data"))})
def clean_data(data):
    return data.filter(data["column"] > 0)


@transform(
    input_mapping={
        "clean_data": Input(clean_data),
        "another_data": Input(Dataset(alias="another_data")),
    }
)
def aggregate_data(clean_data, another_data):
    return (
        clean_data.join(another_data, "column")
        .groupBy("column")
        .agg({"column": "sum"})
        .select("column", "sum(column)")
    )
```

* Input mappings should be defined as a dictionary where the key is the input parameter name and the value is an instance of the `Input` class.
* The `Input` class accepts a `Dataset` or another transform function as an argument.

```mermaid theme={null}
graph LR
    subgraph Pipeline
        clean[clean_data<br/>filter column > 0] --> agg[aggregate_data<br/>join and groupBy]
    end

    data[(Dataset<br/><b>raw_data<b/>)]-->clean
    another[(Dataset<br/><b>another_data<b/>)]-->agg

    style Pipeline fill:#f0f0f0,stroke:#333,stroke-width:2px
    style data fill:#b8e994,stroke:#333,stroke-width:1px
    style another fill:#b8e994,stroke:#333,stroke-width:1px
```

## Output Mapping

Output mapping defines how the output of a transform function should be stored or mapped.
You can specify the output mapping using the `output_mapping` parameter.

| Parameter Name    | Default   | Description                                                            |
| ----------------- | --------- | ---------------------------------------------------------------------- |
| **dataset**       | -         | The dataset where the output should be stored.                         |
| **materialized**  | False     | If True, the output will be stored as a materialized dataset.          |
| **partition\_by** | -         | List of column names to partition the output data by.                  |
| **mode**          | overwrite | The write mode for the output data. Options are `overwrite`, `append`. |

```python theme={null}
from datazone import transform, Output, Dataset

@transform(output_mapping={dataset: Output(dataset=Dataset(alias="clean_data"))})
def clean_data(data):
    return data.filter(data["column"] > 0)
```

* Output mapping should be defined as a dictionary where the key is the output parameter name and the value is an instance of the `Output` class.

### Multiple Outputs

```python theme={null}
@transform(
    input_mapping={"orders": Input(Dataset(alias="orders"))},
    output_mapping={
        "daily_sales": Output(materialized=True),
        "monthly_sales": Output(materialized=True),
        "yearly_sales": Output(
            partition_by=["year"],
            materialized=True
        )
    }
)
def process_sales(orders):
    daily = orders.groupBy("date").agg(...)
    monthly = orders.groupBy("year", "month").agg(...)
    yearly = orders.groupBy("year").agg(...)

    return daily, monthly, yearly
```

```mermaid theme={null}
graph LR
    subgraph Transform
        process_sales
    end

    orders[(orders)]-->process_sales
    process_sales-->daily[(daily_sales)]
    process_sales-->monthly[(monthly_sales)]
    process_sales-->yearly[(yearly_sales)]
```

## Transform Hooks

Transform hooks allow you to execute custom logic on success or failure of a transform function.
You can define hooks using the `on_success` and `on_failure` parameters in the `transform` decorator.

```python theme={null}
from datazone import transform

@transform(name="example_transform")
def generate_sales_data():
    return [("Alice", 100), ("Bob", 150), ("Charlie", 200)]


@generate_sales_data.on_success
def log_success(context):
    print(f"Transform {context.state.name} completed successfully.")


@generate_sales_data.on_failure
def log_failure(context):
    print(f"Transform {context.state.name} failed with error: {context.error}")
```

## Partitioning

Partitioning helps organize and optimize your datasets in our data platform.
When you create a transform function, you can specify partition columns using the `partition_by` parameter.

```python theme={null}
@transform(
    partition_by=["date", "country"],
    materialized=True
)
def sales_by_country(data):
    return data.filter(...)
```

### Why Partition?

* Improve query performance when filtering by partition columns
* Efficiently manage large datasets
* Enable data retention policies by date partitions

### Common Partition Strategies

```python theme={null}
# Daily partitioning for time-series data
@transform(partition_by=["date"])

# Geographic partitioning
@transform(partition_by=["country", "region"])

# Multiple partition columns
@transform(partition_by=["date", "customer_type"])
```

<Note>
  All transformed datasets are stored in Delta Lake format. Choose partition columns based on your most common filtering needs, typically date-based or categorical columns with reasonable cardinality.
  If you partition by a high-cardinality column, it may lead to a large number of small files, which can impact query performance.
</Note>

### Best Practices

* Use date partitioning for time-series data
* Avoid partitioning by columns with high cardinality
* Consider your query patterns when choosing partition columns

## Generator Transforms

Generator transforms allow you to yield data multiple times from a single transform function. This is useful for processing data in chunks, creating multiple batches from a single input, or streaming incremental results.

### Usage

Use Python's `yield` statement to return data in chunks. Each yielded value creates a separate batch that gets written to the output dataset.

```python theme={null}
@transform(
    input_mapping={"user_data": Input(Dataset(alias="users"))},
    output_mapping={"processed_data": Output(Dataset(alias="processed-users"), materialized=True, mode="append")},
)
def process_in_chunks(user_data):
    # Split data into chunks of 1000 rows
    chunk_size = 1000
    total_rows = user_data.count()

    for offset in range(0, total_rows, chunk_size):
        chunk = user_data.limit(chunk_size).offset(offset)
        processed_chunk = chunk.filter(chunk.age >= 18)
        yield processed_chunk
```

```mermaid theme={null}
graph TD
    input[(users<br/>10,000 rows)] --> start[Start Transform]
    start --> loop{For each chunk}
    loop -->|offset 0-999| process1[Process & Filter<br/>chunk 1]
    loop -->|offset 1000-1999| process2[Process & Filter<br/>chunk 2]
    loop -->|offset 2000-2999| process3[Process & Filter<br/>chunk 3]
    loop -->|offset N| processN[Process & Filter<br/>chunk N]

    process1 -->|yield| output1[(Append to<br/>processed-users)]
    process2 -->|yield| output2[(Append to<br/>processed-users)]
    process3 -->|yield| output3[(Append to<br/>processed-users)]
    processN -->|yield| outputN[(Append to<br/>processed-users)]

    output1 --> check1{More chunks?}
    output2 --> check2{More chunks?}
    output3 --> check3{More chunks?}
    outputN --> end1[End]

    check1 -->|Yes| loop
    check2 -->|Yes| loop
    check3 -->|Yes| loop
```

<Warning>
  **Multiple Transactions**: Each `yield` creates a separate transaction. If your transform fails after some yields have succeeded, the already-written batches will remain in the output dataset. Consider idempotency in your pipeline design.
</Warning>

<Warning>
  **Mode Must Be Append**: Generator transforms require `mode="append"` in the output mapping. Using `mode="overwrite"` will cause each yielded batch to overwrite the previous one, leaving only the last batch in the output dataset.
</Warning>
