Pipeline Examples
Basic Data Pipeline
Example 1: CSV to Processed Dataset
This example demonstrates uploading a CSV file to Datazone and then performing basic transformations, and saving the results.
from datazone import transform, Input, Dataset
# Upload data to project and then use it in the transform
@transform(
input_mapping={
"sales_raw": Input(Dataset(alias="sales_raw_csv"))
}
)
def clean_sales_data(sales_raw):
# Remove duplicates and null values
df = sales_raw.dropDuplicates()
df = df.na.drop()
# Convert date string to timestamp
df = df.withColumn("sale_date", to_timestamp("sale_date"))
return df
# Calculate daily metrics
@transform(
input_mapping={
"clean_sales": Input(clean_sales_data)
}
)
def calculate_daily_metrics(clean_sales):
return clean_sales.groupBy("sale_date").agg(
sum("amount").alias("daily_total"),
count("*").alias("transaction_count"),
avg("amount").alias("average_transaction")
)
Data Quality Pipeline
Example 2: Data Validation and Reporting
from datazone import transform, Input, Dataset
@transform(
input_mapping={
"customer_data": Input(Dataset(alias="customer_records"))
}
)
def validate_customer_data(customer_data):
# Check for required fields
validation_df = customer_data.select(
when(col("email").isNull(), "Missing Email")
.when(~col("email").rlike("^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Za-z]{2,}$"), "Invalid Email")
.otherwise("Valid").alias("email_status"),
when(col("phone").isNull(), "Missing Phone")
.when(~col("phone").rlike("^\\+?[1-9]\\d{1,14}$"), "Invalid Phone")
.otherwise("Valid").alias("phone_status")
)
return validation_df
@transform(
input_mapping={
"validation_results": Input(validate_customer_data)
}
)
def generate_validation_report(validation_results):
return validation_results.groupBy(
"email_status", "phone_status"
).count()
Multi-Source Pipeline
Example 4: Combining Data from Multiple Sources
from datazone import transform, Input, Dataset
@transform(
input_mapping={
"orders": Input(Dataset(alias="order_data")),
"customers": Input(Dataset(alias="customer_data")),
"products": Input(Dataset(alias="product_catalog"))
}
)
def create_order_summary(orders, customers, products):
# Join orders with customer data
orders_with_customers = orders.join(
customers,
orders.customer_id == customers.id,
"left"
)
# Join with product data
complete_orders = orders_with_customers.join(
products,
orders.product_id == products.id,
"left"
)
return complete_orders.select(
"order_id",
"order_date",
"customer_name",
"product_name",
"quantity",
"total_amount"
)
Logging Pipeline
Example 6: Pipeline with Comprehensive Logging
from datazone import transform, logger, Input, Dataset
from pyspark.sql.functions import col, count
@transform(
input_mapping={
"sales_data": Input(Dataset(id="daily_sales"))
}
)
def process_sales_with_logging(sales_data):
try:
logger.info(f"Starting sales data processing. Row count: {sales_data.count()}")
# Log data quality metrics
null_counts = sales_data.select([count(when(col(c).isNull(), c)).alias(c)
for c in sales_data.columns])
logger.info(f"Null value counts: {null_counts.toPandas().to_dict()}")
# Process data
logger.info("Applying transformations...")
processed_df = sales_data.filter(col("amount") > 0)
# Calculate aggregates
daily_totals = processed_df.groupBy("date").agg(
sum("amount").alias("total_sales")
)
logger.info(f"Generated daily totals. Output rows: {daily_totals.count()}")
return daily_totals
except Exception as e:
logger.error(f"Error processing sales data: {str(e)}")
logger.error(f"Error details:", exc_info=True) # Logs full stack trace
raise
@transform(
input_mapping={
"daily_totals": Input(process_sales_with_logging)
}
)
def validate_totals_with_logging(daily_totals):
logger.info("Starting totals validation")
try:
# Validate results
invalid_totals = daily_totals.filter(col("total_sales") < 0)
invalid_count = invalid_totals.count()
if invalid_count > 0:
logger.warning(f"Found {invalid_count} invalid total(s)")
logger.debug(f"Invalid records: {invalid_totals.collect()}")
else:
logger.info("All totals validated successfully")
return daily_totals
except Exception as e:
logger.critical(f"Critical error in validation: {str(e)}")
raise
Different logging levels are available:
logger.debug()
: Detailed information for debugging
logger.info()
: General information about pipeline progress
logger.warning()
: Warning messages for potential issues
logger.error()
: Error messages for caught exceptions
logger.critical()
: Critical failures that require immediate attention
Usage Instructions
- Save these transforms in your project’s transform directory
- Configure the dataset IDs to match your environment
- Create a pipeline including the transforms in the desired order
- Set up appropriate scheduling and monitoring