Matrix data processing
This guide walks you through a workflow that processes data across six regions and two data types in parallel using matrix jobs. You will learn how to define a matrix, reference matrix values in expressions, and aggregate results from all parallel instances.
Prerequisites
- H2O AI Cloud access (see Access H2O Workflows).
- Python SDK installed:
pip install h2o-workflows. - H2O Drive bucket named
regional-data.
Step 1: Understand the workflow
Two jobs run in this workflow: process-regional-data uses a matrix to spawn 12 parallel instances (6 regions x 2 data types), each downloading, processing, and uploading data for its specific combination. aggregate-results runs after all 12 complete, collecting metrics and generating a consolidated report.
Step 2: Walk through the YAML
Inputs
The workflow takes a single required input: processing_date, which specifies the date to process in YYYY-MM-DD format.
inputs:
processing_date:
type: string
required: true
description: "Date to process (YYYY-MM-DD)"
See Inputs for full input schema documentation.
Matrix configuration
The matrix is the key feature of this workflow. It tells H2O Workflows to create one job instance for every combination of region and data_type:
matrix:
region: [us-east, us-west, eu-central, eu-west, ap-south, ap-northeast]
data_type: [transactions, events]
With 6 regions and 2 data types, this produces 6 x 2 = 12 parallel job instances. Each instance runs concurrently and accesses its own combination values via ${{ .matrix.region }} and ${{ .matrix.data_type }}. See Matrix Jobs for full documentation.
Dynamic paths with expressions
Each parallel instance works on a unique data path constructed from matrix values:
source: drive://${{ .env.DATA_BUCKET }}/raw/${{ .matrix.region }}/${{ .matrix.data_type }}/${{ .inputs.processing_date }}/
This expression combines the bucket name from env, the region and data type from matrix, and the date from inputs to produce a path like drive://regional-data/raw/us-east/transactions/2024-01-15/. See Expressions and Storage.
Step-level environment variables
The processing step sets environment variables scoped to that single step only:
- name: Process data
env:
REGION: ${{ .matrix.region }}
DATA_TYPE: ${{ .matrix.data_type }}
DATE: ${{ .inputs.processing_date }}
run: |
echo "Processing $DATA_TYPE data for $REGION on $DATE"
python scripts/process.py \
--input ./data/raw/ \
--output ./data/processed/ \
--region $REGION \
--type $DATA_TYPE \
--date $DATE
Step-level env narrows scope so these variables only exist during this step's execution. See Environment Variables.
Aggregation job
After all 12 matrix instances complete, the aggregation job collects their metrics and generates a report:
aggregate-results:
name: Aggregate All Regions
depends_on: [process-regional-data]
runner: cpu-xlarge
timeout: "1h"
The depends_on: [process-regional-data] dependency waits for ALL 12 matrix instances to complete before the aggregation job starts. This is the standard pattern for fan-out/fan-in with matrix jobs. See Jobs.
Step 3: Deploy with the Python SDK
import h2o_workflows
clients = h2o_workflows.login()
with open("examples/matrix-data-processing.yaml") as f:
source = f.read()
from h2o_workflows.workflow.workflow import Workflow
workflow = clients.workflow.create_workflow(
parent="workspaces/my-workspace",
workflow=Workflow(source_contents=source),
)
print(f"Created: {workflow.name}")
This workflow has no schedule trigger, so you activate and trigger it manually or call it from another workflow. For the full client API, see the Python SDK Reference.
Complete YAML
id: matrix-data-processing
name: Parallel Regional Data Processing
inputs:
processing_date:
type: string
required: true
description: "Date to process (YYYY-MM-DD)"
env:
DATA_BUCKET: "regional-data"
SCRIPTS_REPO: "https://github.com/h2oai/data-pipeline"
jobs:
# Process data for each region×type combination in parallel
process-regional-data:
name: Process Regional Data
matrix:
region: [us-east, us-west, eu-central, eu-west, ap-south, ap-northeast]
data_type: [transactions, events]
runner: cpu-large
timeout: "2h"
steps:
- name: Download raw data
download:
source: drive://${{ .env.DATA_BUCKET }}/raw/${{ .matrix.region }}/${{ .matrix.data_type }}/${{ .inputs.processing_date }}/
path: ./data/raw/
- name: Clone processing scripts
run: git clone --depth 1 $SCRIPTS_REPO scripts
- name: Install dependencies
run: pip install -r scripts/requirements.txt
- name: Process data
env:
REGION: ${{ .matrix.region }}
DATA_TYPE: ${{ .matrix.data_type }}
DATE: ${{ .inputs.processing_date }}
run: |
echo "Processing $DATA_TYPE data for $REGION on $DATE"
python scripts/process.py \
--input ./data/raw/ \
--output ./data/processed/ \
--region $REGION \
--type $DATA_TYPE \
--date $DATE
- name: Upload processed data
upload:
path: ./data/processed/
destination: drive://${{ .env.DATA_BUCKET }}/processed/${{ .matrix.region }}/${{ .matrix.data_type }}/${{ .inputs.processing_date }}/
- name: Generate metrics
run: python scripts/generate_metrics.py --input ./data/processed/
- name: Upload metrics
upload:
path: metrics.json
destination: drive://${{ .env.DATA_BUCKET }}/metrics/${{ .matrix.region }}-${{ .matrix.data_type }}-${{ .inputs.processing_date }}.json
# Aggregate results after all 12 regional processing instances complete (6 regions × 2 data types)
aggregate-results:
name: Aggregate All Regions
depends_on: [process-regional-data]
runner: cpu-xlarge
timeout: "1h"
steps:
- name: Download all metrics
download:
source: drive://${{ .env.DATA_BUCKET }}/metrics/
path: ./metrics/
- name: Clone aggregation scripts
run: git clone --depth 1 $SCRIPTS_REPO scripts
- name: Aggregate metrics
env:
DATE: ${{ .inputs.processing_date }}
run: |
python scripts/aggregate.py \
--input ./metrics/ \
--output ./aggregated/ \
--date $DATE
- name: Upload aggregated results
upload:
path: ./aggregated/
destination: drive://${{ .env.DATA_BUCKET }}/aggregated/${{ .inputs.processing_date }}/
- name: Generate report
run: python scripts/report.py --input ./aggregated/ --output report.html
- name: Upload report
upload:
path: report.html
destination: drive://${{ .env.DATA_BUCKET }}/reports/${{ .inputs.processing_date }}.html
Next steps
- Reusable Workflow — Create callable workflows with secret management.
- Submit and view feedback for this page
- Send feedback about H2O Workflows to cloud-feedback@h2o.ai