from datazone import transform

def say_hello():
    print("Hello, World!")

def say_goodbye():
    print("Goodbye, World!")
  • Each pipeline should have a unique alias and should be defined in the different files.
  • A pipeline should have at least one transform function.
  • You can define dependencies between pipelines using the depends or input_mapping parameter to create a directed acyclic graph (DAG).

Complex Pipeline Example

from datazone import transform

def prepare():
    print("Preparing data...")

def build_project():
    print("Building project...")

def build_report():
    print("Building report...")

@transform(depends=[build_project, build_report])
def notify_email():
    print("Sending email notification...")

Data Flow Management

The @transform decorator enables you to define data transformation functions efficiently. Each function should:

  • Accept input data as arguments
  • Process the data
  • Return the transformed data
Data is handled as PySpark DataFrames both for input and output operations.
from datazone import transform

@transform(input_mapping={'data': Input(Dataset(alias='input_data')})
def clean_data(data):
    return data.filter(data['column'] > 0)

@transform(input_mapping={'clean_data': Input(clean_data)}, materialized=True)
def aggregate_data(clean_data):
    return clean_data.groupBy('column').agg({'column': 'sum'})

In above example,

  1. clean_data function takes input_data as input. You can check the dataset alias in the Datazone UI or use the datazone dataset list command to list all datasets.
  2. After cleaning the data, the clean_data function returns the cleaned data PySpark DataFrame as lazy evaluation.
  3. The aggregate_data function takes the cleaned data as input and aggregates it and returns the aggregated data.
  4. Since the materialized parameter is set to True, the aggregate_data function will be materialized and create a new dataset in Datazone.
Check the Transform section for more information on how to define a transform decorator.