Skip to main content

Simple pipeline

This guide walks you through a three-job data pipeline that fetches, processes, and publishes data on a schedule. You will learn how to define inputs, set up cron triggers, use environment variables, and chain jobs with dependencies.

Prerequisites

Step 1: Understand the workflow

This workflow defines a simple-pipeline that runs three jobs in sequence: fetch-data downloads and validates raw data, process transforms it, and publish makes the results available. Two cron schedules trigger the pipeline automatically at 6 AM and 6 PM UTC with different bucket configurations.

Step 2: Walk through the YAML

Inputs

The workflow defines two required string inputs: data_bucket and output_bucket. These are the UUIDs of your H2O Drive buckets.

inputs:
data_bucket:
type: string
required: true
description: "Source data bucket UUID"
output_bucket:
type: string
required: true
description: "Output data bucket UUID"

Inputs are typed parameters you pass at runtime or bind in triggers. See Inputs syntax reference for the full input schema.

Triggers

The workflow uses two schedule entries, each providing its own input values:

trigger:
schedule:
- cron: "0 6 * * *"
inputs:
data_bucket: "f47ac10b-58cc-4372-a567-0e02b2c3d479"
output_bucket: "8f4e3c2a-1b5d-4e8f-9a7c-6d3e2f1a0b9c"
- cron: "0 18 * * *"
inputs:
data_bucket: "a1b2c3d4-e5f6-7890-1234-56789abcdef0"
output_bucket: "b2c3d4e5-f6a7-8901-2345-6789abcdef01"

Each cron entry provides its own input values, so the same workflow runs against different buckets at different times — 6 AM against one set of buckets, 6 PM against another. See Triggers syntax reference.

Environment variables

The env block maps input values to environment variables available across all jobs:

env:
DATA_BUCKET: ${{ .inputs.data_bucket }}
OUTPUT_BUCKET: ${{ .inputs.output_bucket }}
SCRIPTS_REPO: "https://github.com/h2oai/data-pipeline"

The ${{ .inputs.data_bucket }} syntax is a Go template expression that resolves to the runtime input value. See Environment Variables and Expressions.

Jobs and dependencies

The workflow defines three jobs that form a linear pipeline:

  • fetch-data — no dependencies, runs first.
  • process — depends on fetch-data, runs after it completes.
  • publish — depends on process, runs last.

Here is the process job definition:

process:
name: Process data
depends_on: [fetch-data]
runner: cpu-large
timeout: "1h"

Each job specifies a runner size to control compute resources and a timeout for maximum execution time. See Jobs syntax reference.

Steps

Each job contains a sequence of steps. The fetch-data job uses three step types: download to fetch data from H2O Drive, run to execute shell commands, and upload to store results:

fetch-data:
name: Fetch data from source
runner: cpu-medium
timeout: "30m"

steps:
- name: Download raw data
download:
source: drive://$DATA_BUCKET/raw-data/dataset.csv
path: ./data/input.csv

- name: Clone scripts
run: git clone --depth 1 $SCRIPTS_REPO scripts

- name: Install dependencies
run: pip install -r scripts/requirements.txt

- name: Validate data
run: python scripts/validate.py --input data/input.csv

- name: Upload validated data
upload:
path: data/input.csv
destination: drive://$OUTPUT_BUCKET/validated/dataset.csv

See Steps syntax reference and Storage for full documentation on download, upload, and run step types.

Step 3: Deploy with the Python SDK

Once you have the YAML file, use the Python SDK to create and activate the workflow:

import h2o_workflows

# Connect to H2O AI Cloud
clients = h2o_workflows.login()

# Read the workflow YAML
with open("examples/simple-pipeline.yaml") as f:
source = f.read()

# Create the workflow in your workspace
from h2o_workflows.workflow.workflow import Workflow

workflow = clients.workflow.create_workflow(
parent="workspaces/my-workspace",
workflow=Workflow(source_contents=source),
)
print(f"Created: {workflow.name}")

# Activate the workflow to enable scheduled triggers
clients.workflow.activate_workflow(name=workflow.name)
print("Workflow activated -- triggers are now live")

This creates the workflow and activates it so the cron triggers start firing. For the full client API, see the Python SDK Reference.

Complete YAML

id: simple-pipeline
name: Simple Pipeline

inputs:
data_bucket:
type: string
required: true
description: "Source data bucket UUID"

output_bucket:
type: string
required: true
description: "Output data bucket UUID"

trigger:
schedule:
- cron: "0 6 * * *"
inputs:
data_bucket: "f47ac10b-58cc-4372-a567-0e02b2c3d479"
output_bucket: "8f4e3c2a-1b5d-4e8f-9a7c-6d3e2f1a0b9c"
- cron: "0 18 * * *"
inputs:
data_bucket: "a1b2c3d4-e5f6-7890-1234-56789abcdef0"
output_bucket: "b2c3d4e5-f6a7-8901-2345-6789abcdef01"

env:
DATA_BUCKET: ${{ .inputs.data_bucket }}
OUTPUT_BUCKET: ${{ .inputs.output_bucket }}
SCRIPTS_REPO: "https://github.com/h2oai/data-pipeline"

jobs:
fetch-data:
name: Fetch data from source
runner: cpu-medium
timeout: "30m"

steps:
- name: Download raw data
download:
source: drive://$DATA_BUCKET/raw-data/dataset.csv
path: ./data/input.csv

- name: Clone scripts
run: git clone --depth 1 $SCRIPTS_REPO scripts

- name: Install dependencies
run: pip install -r scripts/requirements.txt

- name: Validate data
run: python scripts/validate.py --input data/input.csv

- name: Upload validated data
upload:
path: data/input.csv
destination: drive://$OUTPUT_BUCKET/validated/dataset.csv

process:
name: Process data
depends_on: [fetch-data]
runner: cpu-large
timeout: "1h"

steps:
- name: Download validated data
download:
source: drive://$OUTPUT_BUCKET/validated/dataset.csv
path: ./data/input.csv

- name: Clone scripts
run: git clone --depth 1 $SCRIPTS_REPO scripts

- name: Install dependencies
run: pip install -r scripts/requirements.txt

- name: Run processing
run: python scripts/process.py --input data/input.csv --output data/output.csv

- name: Upload processed data
upload:
path: data/output.csv
destination: drive://$OUTPUT_BUCKET/processed/dataset.csv

publish:
name: Publish results
depends_on: [process]
runner: cpu-small
timeout: "15m"

steps:
- name: Download processed data
download:
source: drive://$OUTPUT_BUCKET/processed/dataset.csv
path: ./data/processed.csv

- name: Clone scripts
run: git clone --depth 1 $SCRIPTS_REPO scripts

- name: Install dependencies
run: pip install -r scripts/requirements.txt

- name: Publish data
run: python scripts/publish.py --input data/processed.csv

- name: Upload final results
upload:
path: data/processed.csv
destination: drive://$OUTPUT_BUCKET/published/dataset.csv

Next steps


Feedback