Simple pipeline
This guide walks you through a three-job data pipeline that fetches, processes, and publishes data on a schedule. You will learn how to define inputs, set up cron triggers, use environment variables, and chain jobs with dependencies.
Prerequisites
- H2O AI Cloud access (see Access H2O Workflows).
- Python SDK installed:
pip install h2o-workflows. - Two H2O Drive buckets for source and output data.
Step 1: Understand the workflow
This workflow defines a simple-pipeline that runs three jobs in sequence: fetch-data downloads and validates raw data, process transforms it, and publish makes the results available. Two cron schedules trigger the pipeline automatically at 6 AM and 6 PM UTC with different bucket configurations.
Step 2: Walk through the YAML
Inputs
The workflow defines two required string inputs: data_bucket and output_bucket. These are the UUIDs of your H2O Drive buckets.
inputs:
data_bucket:
type: string
required: true
description: "Source data bucket UUID"
output_bucket:
type: string
required: true
description: "Output data bucket UUID"
Inputs are typed parameters you pass at runtime or bind in triggers. See Inputs syntax reference for the full input schema.
Triggers
The workflow uses two schedule entries, each providing its own input values:
trigger:
schedule:
- cron: "0 6 * * *"
inputs:
data_bucket: "f47ac10b-58cc-4372-a567-0e02b2c3d479"
output_bucket: "8f4e3c2a-1b5d-4e8f-9a7c-6d3e2f1a0b9c"
- cron: "0 18 * * *"
inputs:
data_bucket: "a1b2c3d4-e5f6-7890-1234-56789abcdef0"
output_bucket: "b2c3d4e5-f6a7-8901-2345-6789abcdef01"
Each cron entry provides its own input values, so the same workflow runs against different buckets at different times — 6 AM against one set of buckets, 6 PM against another. See Triggers syntax reference.
Environment variables
The env block maps input values to environment variables available across all jobs:
env:
DATA_BUCKET: ${{ .inputs.data_bucket }}
OUTPUT_BUCKET: ${{ .inputs.output_bucket }}
SCRIPTS_REPO: "https://github.com/h2oai/data-pipeline"
The ${{ .inputs.data_bucket }} syntax is a Go template expression that resolves to the runtime input value. See Environment Variables and Expressions.
Jobs and dependencies
The workflow defines three jobs that form a linear pipeline:
fetch-data— no dependencies, runs first.process— depends onfetch-data, runs after it completes.publish— depends onprocess, runs last.
Here is the process job definition:
process:
name: Process data
depends_on: [fetch-data]
runner: cpu-large
timeout: "1h"
Each job specifies a runner size to control compute resources and a timeout for maximum execution time. See Jobs syntax reference.
Steps
Each job contains a sequence of steps. The fetch-data job uses three step types: download to fetch data from H2O Drive, run to execute shell commands, and upload to store results:
fetch-data:
name: Fetch data from source
runner: cpu-medium
timeout: "30m"
steps:
- name: Download raw data
download:
source: drive://$DATA_BUCKET/raw-data/dataset.csv
path: ./data/input.csv
- name: Clone scripts
run: git clone --depth 1 $SCRIPTS_REPO scripts
- name: Install dependencies
run: pip install -r scripts/requirements.txt
- name: Validate data
run: python scripts/validate.py --input data/input.csv
- name: Upload validated data
upload:
path: data/input.csv
destination: drive://$OUTPUT_BUCKET/validated/dataset.csv
See Steps syntax reference and Storage for full documentation on download, upload, and run step types.
Step 3: Deploy with the Python SDK
Once you have the YAML file, use the Python SDK to create and activate the workflow:
import h2o_workflows
# Connect to H2O AI Cloud
clients = h2o_workflows.login()
# Read the workflow YAML
with open("examples/simple-pipeline.yaml") as f:
source = f.read()
# Create the workflow in your workspace
from h2o_workflows.workflow.workflow import Workflow
workflow = clients.workflow.create_workflow(
parent="workspaces/my-workspace",
workflow=Workflow(source_contents=source),
)
print(f"Created: {workflow.name}")
# Activate the workflow to enable scheduled triggers
clients.workflow.activate_workflow(name=workflow.name)
print("Workflow activated -- triggers are now live")
This creates the workflow and activates it so the cron triggers start firing. For the full client API, see the Python SDK Reference.
Complete YAML
id: simple-pipeline
name: Simple Pipeline
inputs:
data_bucket:
type: string
required: true
description: "Source data bucket UUID"
output_bucket:
type: string
required: true
description: "Output data bucket UUID"
trigger:
schedule:
- cron: "0 6 * * *"
inputs:
data_bucket: "f47ac10b-58cc-4372-a567-0e02b2c3d479"
output_bucket: "8f4e3c2a-1b5d-4e8f-9a7c-6d3e2f1a0b9c"
- cron: "0 18 * * *"
inputs:
data_bucket: "a1b2c3d4-e5f6-7890-1234-56789abcdef0"
output_bucket: "b2c3d4e5-f6a7-8901-2345-6789abcdef01"
env:
DATA_BUCKET: ${{ .inputs.data_bucket }}
OUTPUT_BUCKET: ${{ .inputs.output_bucket }}
SCRIPTS_REPO: "https://github.com/h2oai/data-pipeline"
jobs:
fetch-data:
name: Fetch data from source
runner: cpu-medium
timeout: "30m"
steps:
- name: Download raw data
download:
source: drive://$DATA_BUCKET/raw-data/dataset.csv
path: ./data/input.csv
- name: Clone scripts
run: git clone --depth 1 $SCRIPTS_REPO scripts
- name: Install dependencies
run: pip install -r scripts/requirements.txt
- name: Validate data
run: python scripts/validate.py --input data/input.csv
- name: Upload validated data
upload:
path: data/input.csv
destination: drive://$OUTPUT_BUCKET/validated/dataset.csv
process:
name: Process data
depends_on: [fetch-data]
runner: cpu-large
timeout: "1h"
steps:
- name: Download validated data
download:
source: drive://$OUTPUT_BUCKET/validated/dataset.csv
path: ./data/input.csv
- name: Clone scripts
run: git clone --depth 1 $SCRIPTS_REPO scripts
- name: Install dependencies
run: pip install -r scripts/requirements.txt
- name: Run processing
run: python scripts/process.py --input data/input.csv --output data/output.csv
- name: Upload processed data
upload:
path: data/output.csv
destination: drive://$OUTPUT_BUCKET/processed/dataset.csv
publish:
name: Publish results
depends_on: [process]
runner: cpu-small
timeout: "15m"
steps:
- name: Download processed data
download:
source: drive://$OUTPUT_BUCKET/processed/dataset.csv
path: ./data/processed.csv
- name: Clone scripts
run: git clone --depth 1 $SCRIPTS_REPO scripts
- name: Install dependencies
run: pip install -r scripts/requirements.txt
- name: Publish data
run: python scripts/publish.py --input data/processed.csv
- name: Upload final results
upload:
path: data/processed.csv
destination: drive://$OUTPUT_BUCKET/published/dataset.csv
Next steps
- Matrix Data Processing — Learn parallel execution with matrix jobs.
- Reusable Workflow — Create callable workflows.
- Submit and view feedback for this page
- Send feedback about H2O Workflows to cloud-feedback@h2o.ai