Pipeline Examples

Basic Data Pipeline

Example 1: CSV to Processed Dataset

This example demonstrates uploading a CSV file to Datazone and then performing basic transformations, and saving the results.

from datazone import transform, Input, Dataset

# Upload data to project and then use it in the transform
@transform(
    input_mapping={
        "sales_raw": Input(Dataset(alias="sales_raw_csv"))
    }
)
def clean_sales_data(sales_raw):
    # Remove duplicates and null values
    df = sales_raw.dropDuplicates()
    df = df.na.drop()
    
    # Convert date string to timestamp
    df = df.withColumn("sale_date", to_timestamp("sale_date"))
    
    return df

# Calculate daily metrics
@transform(
    input_mapping={
        "clean_sales": Input(clean_sales_data)
    }
)
def calculate_daily_metrics(clean_sales):
    return clean_sales.groupBy("sale_date").agg(
        sum("amount").alias("daily_total"),
        count("*").alias("transaction_count"),
        avg("amount").alias("average_transaction")
    )

Data Quality Pipeline

Example 2: Data Validation and Reporting

from datazone import transform, Input, Dataset

@transform(
    input_mapping={
        "customer_data": Input(Dataset(alias="customer_records"))
    }
)
def validate_customer_data(customer_data):
    # Check for required fields
    validation_df = customer_data.select(
        when(col("email").isNull(), "Missing Email")
        .when(~col("email").rlike("^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Za-z]{2,}$"), "Invalid Email")
        .otherwise("Valid").alias("email_status"),
        when(col("phone").isNull(), "Missing Phone")
        .when(~col("phone").rlike("^\\+?[1-9]\\d{1,14}$"), "Invalid Phone")
        .otherwise("Valid").alias("phone_status")
    )
    
    return validation_df

@transform(
    input_mapping={
        "validation_results": Input(validate_customer_data)
    }
)
def generate_validation_report(validation_results):
    return validation_results.groupBy(
        "email_status", "phone_status"
    ).count()

Multi-Source Pipeline

Example 4: Combining Data from Multiple Sources

from datazone import transform, Input, Dataset

@transform(
    input_mapping={
        "orders": Input(Dataset(alias="order_data")),
        "customers": Input(Dataset(alias="customer_data")),
        "products": Input(Dataset(alias="product_catalog"))
    }
)
def create_order_summary(orders, customers, products):
    # Join orders with customer data
    orders_with_customers = orders.join(
        customers,
        orders.customer_id == customers.id,
        "left"
    )
    
    # Join with product data
    complete_orders = orders_with_customers.join(
        products,
        orders.product_id == products.id,
        "left"
    )
    
    return complete_orders.select(
        "order_id",
        "order_date",
        "customer_name",
        "product_name",
        "quantity",
        "total_amount"
    )

Logging Pipeline

Example 6: Pipeline with Comprehensive Logging

from datazone import transform, logger, Input, Dataset
from pyspark.sql.functions import col, count

@transform(
    input_mapping={
        "sales_data": Input(Dataset(id="daily_sales"))
    }
)
def process_sales_with_logging(sales_data):
    try:
        logger.info(f"Starting sales data processing. Row count: {sales_data.count()}")
        
        # Log data quality metrics
        null_counts = sales_data.select([count(when(col(c).isNull(), c)).alias(c) 
                                       for c in sales_data.columns])
        logger.info(f"Null value counts: {null_counts.toPandas().to_dict()}")
        
        # Process data
        logger.info("Applying transformations...")
        processed_df = sales_data.filter(col("amount") > 0)
        
        # Calculate aggregates
        daily_totals = processed_df.groupBy("date").agg(
            sum("amount").alias("total_sales")
        )
        
        logger.info(f"Generated daily totals. Output rows: {daily_totals.count()}")
        return daily_totals
        
    except Exception as e:
        logger.error(f"Error processing sales data: {str(e)}")
        logger.error(f"Error details:", exc_info=True)  # Logs full stack trace
        raise
        
@transform(
    input_mapping={
        "daily_totals": Input(process_sales_with_logging)
    }
)
def validate_totals_with_logging(daily_totals):
    logger.info("Starting totals validation")
    
    try:
        # Validate results
        invalid_totals = daily_totals.filter(col("total_sales") < 0)
        invalid_count = invalid_totals.count()
        
        if invalid_count > 0:
            logger.warning(f"Found {invalid_count} invalid total(s)")
            logger.debug(f"Invalid records: {invalid_totals.collect()}")
        else:
            logger.info("All totals validated successfully")
            
        return daily_totals
        
    except Exception as e:
        logger.critical(f"Critical error in validation: {str(e)}")
        raise

Different logging levels are available:

  • logger.debug(): Detailed information for debugging
  • logger.info(): General information about pipeline progress
  • logger.warning(): Warning messages for potential issues
  • logger.error(): Error messages for caught exceptions
  • logger.critical(): Critical failures that require immediate attention

Usage Instructions

  1. Save these transforms in your project’s transform directory
  2. Configure the dataset IDs to match your environment
  3. Create a pipeline including the transforms in the desired order
  4. Set up appropriate scheduling and monitoring