Skip to main content
Version: Next

Scoring with GBQ

Use GBQ as source and sink for MLOps batch scoring.

Prerequisites

bigquery.jobs.create
bigquery.readsessions.create
bigquery.readsessions.getData
bigquery.tables.create
bigquery.tables.get
bigquery.tables.getData
bigquery.tables.update
bigquery.tables.updateData
storage.buckets.get
storage.objects.create
storage.objects.delete
storage.objects.get
storage.objects.list
  • The Google Cloud service account JSON file

Setup

  • Install the h2o_mlops_scoring_client with pip.
  • Create a jars folder for the GBQ and GCS Hadoop/Spark jars and place the gcs-connector-hadoop3-*-shaded.jar and spark-bigquery-with-dependencies_2.12-0.24.2.jar files in it.
  • Create a conf folder for default configuration files. In the conf folder, create a spark-defaults.conf file with the following contents (make sure to update the jars path but keep the /* at the end and update the service account JSON path):
spark.jars /path/to/jars/*
spark.hadoop.google.cloud.service.account.json.keyfile /path/to/service-account.json
spark.hadoop.google.cloud.auth.service.account.enable true
spark.hadoop.google.cloud.auth.type SERVICE_ACCOUNT_JSON_KEYFILE
spark.hadoop.fs.gs.impl com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem
spark.hadoop.fs.AbstractFileSystem.gs.impl com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS

Notes

  • If running locally, the number of cores used (and thus parallel processes) can be set with:
num_cores = 10
h2o_mlops_scoring_client.spark_master = f"local[{num_cores}]"
  • The GBQ connector requires the GOOGLE_APPLICATION_CREDENTIALS environment variable be set. If needed, this can be set through Python with:
import os
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/path/to/service-account.json"
  • GBQ does not allow "." in column names. For this reason, it is required to use a post process method to replace "." in column names. For example (see example usage for details on how to use the method):
from pyspark.sql.functions import col
def postprocess(spark_df):
"""Required for GBQ."""
sanitized_column_names = {}
for c in spark_df.columns:
if "." in c:
sanitized_column_names[c] = c.replace(".", "_")
return spark_df.select(
[col(f"`{c}`").alias(sanitized_column_names.get(c, c)) for c in spark_df.columns]
)
  • GBQ requires a temporaryGcsBucket Spark configuration to be set (see example usage for details on how to set it or it can be added as a default to spark-defaults.conf).

Example Usage

import h2o_mlops_scoring_client
import os

Point the scorer to the conf directory you want to use.

h2o_mlops_scoring_client.spark_conf_dir = os.path.expanduser("~/.h2o_mlops_scoring_client/gbq-conf")

Set the GOOGLE_APPLICATION_CREDENTIALS environment variable if required.

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = os.path.expanduser("~/.h2o_mlops_scoring_client/google-service-account.json")

Choose the MLOps scoring endpoint.

MLOPS_ENDPOINT_URL = "https://model.internal.dedicated.h2o.ai/f325d002-3c3f-4283-9585-1569afc5dd12/model/score"

Set the tables to use along with a unique ID column used to identify each score.

ID_COLUMN = "ID"
SOURCE_DATA = "h2o-gce.bnpparibas.bnpparibas"
SINK_LOCATION = "h2o-gce.bnpparibas.mlops_scoring_client"

Set the source and sink to use GBQ tables.

SOURCE_FORMAT = h2o_mlops_scoring_client.Format.BIGQUERY
SINK_FORMAT = h2o_mlops_scoring_client.Format.BIGQUERY

Set the sink write mode. Look at the WriteMode value to see its behavior.

h2o_mlops_scoring_client.WriteMode.OVERWRITE.value
'Overwrite existing files'
SINK_WRITE_MODE = h2o_mlops_scoring_client.WriteMode.OVERWRITE

If the table is small enough, you will want to preprocess the table into partitions to take advantage of parallel scoring. The number of partitions should equal the number of cores times 3. Large tables should be automatically partitioned by GBQ and reparitioning may have detrimental effect on scoring time.

def preprocess(spark_df):
return spark_df.repartition(30)

Here we set up the postprocess method required for GBQ (see notes).

from pyspark.sql.functions import col

def postprocess(spark_df):
"""Required for GBQ."""
sanitized_column_names = {}
for c in spark_df.columns:
if "." in c:
sanitized_column_names[c] = c.replace(".", "_")
return spark_df.select(
[col(f"`{c}`").alias(sanitized_column_names.get(c, c)) for c in spark_df.columns]
)

And now we score. Note the spark_config_overrides mentioned in the Notes section.

h2o_mlops_scoring_client.score_source_sink(
mlops_endpoint_url=MLOPS_ENDPOINT_URL,
id_column=ID_COLUMN,
source_data=SOURCE_DATA,
source_format=SOURCE_FORMAT,
sink_location=SINK_LOCATION,
sink_format=SINK_FORMAT,
sink_write_mode=SINK_WRITE_MODE,
preprocess_method=preprocess,
postprocess_method=postprocess,
spark_config_overrides={
"temporaryGcsBucket": "h2o-bq-large-dataset"
}
)
23/05/02 17:47:11 INFO h2o_mlops_scoring_client: Starting Spark context


Default GBQ Spark configuration applied.


Warning: Ignoring non-Spark config property: temporaryGcsBucket
23/05/02 17:47:12 WARN Utils: Your hostname, M16Max-100638.local resolves to a loopback address: 127.0.0.1; using 192.168.1.8 instead (on interface en0)
23/05/02 17:47:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
23/05/02 17:47:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/02 17:47:13 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/05/02 17:47:13 INFO h2o_mlops_scoring_client: Connecting to H2O.ai MLOps scorer at 'https://model.cloud-qa.h2o.ai/fda0bb15-9561-49e5-80fc-3ffd1393216c/model/score'
23/05/02 17:47:16 INFO h2o_mlops_scoring_client: Applying preprocess method
23/05/02 17:47:16 INFO h2o_mlops_scoring_client: Starting scoring from 'h2o-gce.bnpparibas.bnpparibas' to 'h2o-gce.bnpparibas.mlops_scoring_client'
23/05/02 17:47:16 INFO h2o_mlops_scoring_client: Applying postprocess method
23/05/02 17:47:16 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
23/05/02 17:48:51 INFO h2o_mlops_scoring_client: Scoring complete
23/05/02 17:48:51 INFO h2o_mlops_scoring_client: Total run time: 0:01:40
23/05/02 17:48:51 INFO h2o_mlops_scoring_client: Scoring run time: 0:01:35
23/05/02 17:48:51 INFO h2o_mlops_scoring_client: Stopping Spark context

Feedback