Scoring with GBQ
Use GBQ as source and sink for MLOps batch scoring.
Prerequisites
h2o_mlops_scoring_client-*-py3-none-any.whl
file or access to PyPI- Java
gcs-connector-hadoop3-*-shaded.jar
file (download the latestgcs-connector-hadoop3-*-shaded.jar
from https://github.com/GoogleCloudDataproc/hadoop-connectors/releases)spark-bigquery-with-dependencies_2.12-0.24.2.jar
file (downloadspark-bigquery-with-dependencies_2.12-0.24.2.jar
from https://github.com/GoogleCloudDataproc/spark-bigquery-connector/releases)- Google Cloud service account with following permissions:
bigquery.jobs.create
bigquery.readsessions.create
bigquery.readsessions.getData
bigquery.tables.create
bigquery.tables.get
bigquery.tables.getData
bigquery.tables.update
bigquery.tables.updateData
storage.buckets.get
storage.objects.create
storage.objects.delete
storage.objects.get
storage.objects.list
- The Google Cloud service account JSON file
Setup
- Install the
h2o_mlops_scoring_client
withpip
. - Create a
jars
folder for the GBQ and GCS Hadoop/Spark jars and place thegcs-connector-hadoop3-*-shaded.jar
andspark-bigquery-with-dependencies_2.12-0.24.2.jar
files in it. - Create a
conf
folder for default configuration files. In theconf
folder, create aspark-defaults.conf
file with the following contents (make sure to update the jars path but keep the/*
at the end and update the service account JSON path):
spark.jars /path/to/jars/*
spark.hadoop.google.cloud.service.account.json.keyfile /path/to/service-account.json
spark.hadoop.google.cloud.auth.service.account.enable true
spark.hadoop.google.cloud.auth.type SERVICE_ACCOUNT_JSON_KEYFILE
spark.hadoop.fs.gs.impl com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem
spark.hadoop.fs.AbstractFileSystem.gs.impl com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS
Notes
- If running locally, the number of cores used (and thus parallel processes) can be set with:
num_cores = 10
h2o_mlops_scoring_client.spark_master = f"local[{num_cores}]"
- The GBQ connector requires the
GOOGLE_APPLICATION_CREDENTIALS
environment variable be set. If needed, this can be set through Python with:
import os
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/path/to/service-account.json"
- GBQ does not allow "." in column names. For this reason, it is required to use a post process method to replace "." in column names. For example (see example usage for details on how to use the method):
from pyspark.sql.functions import col
def postprocess(spark_df):
"""Required for GBQ."""
sanitized_column_names = {}
for c in spark_df.columns:
if "." in c:
sanitized_column_names[c] = c.replace(".", "_")
return spark_df.select(
[col(f"`{c}`").alias(sanitized_column_names.get(c, c)) for c in spark_df.columns]
)
- GBQ requires a
temporaryGcsBucket
Spark configuration to be set (see example usage for details on how to set it or it can be added as a default tospark-defaults.conf
).
Example Usage
import h2o_mlops_scoring_client
import os
Point the scorer to the conf
directory you want to use.
h2o_mlops_scoring_client.spark_conf_dir = os.path.expanduser("~/.h2o_mlops_scoring_client/gbq-conf")
Set the GOOGLE_APPLICATION_CREDENTIALS
environment variable if required.
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = os.path.expanduser("~/.h2o_mlops_scoring_client/google-service-account.json")
Choose the MLOps scoring endpoint.
MLOPS_ENDPOINT_URL = "https://model.internal.dedicated.h2o.ai/f325d002-3c3f-4283-9585-1569afc5dd12/model/score"
Set the tables to use along with a unique ID column used to identify each score.
ID_COLUMN = "ID"
SOURCE_DATA = "h2o-gce.bnpparibas.bnpparibas"
SINK_LOCATION = "h2o-gce.bnpparibas.mlops_scoring_client"
Set the source and sink to use GBQ tables.
SOURCE_FORMAT = h2o_mlops_scoring_client.Format.BIGQUERY
SINK_FORMAT = h2o_mlops_scoring_client.Format.BIGQUERY
Set the sink write mode. Look at the WriteMode
value to see its behavior.
h2o_mlops_scoring_client.WriteMode.OVERWRITE.value
'Overwrite existing files'
SINK_WRITE_MODE = h2o_mlops_scoring_client.WriteMode.OVERWRITE
If the table is small enough, you will want to preprocess the table into partitions to take advantage of parallel scoring. The number of partitions should equal the number of cores times 3. Large tables should be automatically partitioned by GBQ and reparitioning may have detrimental effect on scoring time.
def preprocess(spark_df):
return spark_df.repartition(30)
Here we set up the postprocess method required for GBQ (see notes).
from pyspark.sql.functions import col
def postprocess(spark_df):
"""Required for GBQ."""
sanitized_column_names = {}
for c in spark_df.columns:
if "." in c:
sanitized_column_names[c] = c.replace(".", "_")
return spark_df.select(
[col(f"`{c}`").alias(sanitized_column_names.get(c, c)) for c in spark_df.columns]
)
And now we score. Note the spark_config_overrides
mentioned in the Notes section.
h2o_mlops_scoring_client.score_source_sink(
mlops_endpoint_url=MLOPS_ENDPOINT_URL,
id_column=ID_COLUMN,
source_data=SOURCE_DATA,
source_format=SOURCE_FORMAT,
sink_location=SINK_LOCATION,
sink_format=SINK_FORMAT,
sink_write_mode=SINK_WRITE_MODE,
preprocess_method=preprocess,
postprocess_method=postprocess,
spark_config_overrides={
"temporaryGcsBucket": "h2o-bq-large-dataset"
}
)
23/05/02 17:47:11 INFO h2o_mlops_scoring_client: Starting Spark context
Default GBQ Spark configuration applied.
Warning: Ignoring non-Spark config property: temporaryGcsBucket
23/05/02 17:47:12 WARN Utils: Your hostname, M16Max-100638.local resolves to a loopback address: 127.0.0.1; using 192.168.1.8 instead (on interface en0)
23/05/02 17:47:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
23/05/02 17:47:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/02 17:47:13 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/05/02 17:47:13 INFO h2o_mlops_scoring_client: Connecting to H2O.ai MLOps scorer at 'https://model.cloud-qa.h2o.ai/fda0bb15-9561-49e5-80fc-3ffd1393216c/model/score'
23/05/02 17:47:16 INFO h2o_mlops_scoring_client: Applying preprocess method
23/05/02 17:47:16 INFO h2o_mlops_scoring_client: Starting scoring from 'h2o-gce.bnpparibas.bnpparibas' to 'h2o-gce.bnpparibas.mlops_scoring_client'
23/05/02 17:47:16 INFO h2o_mlops_scoring_client: Applying postprocess method
23/05/02 17:47:16 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
23/05/02 17:48:51 INFO h2o_mlops_scoring_client: Scoring complete
23/05/02 17:48:51 INFO h2o_mlops_scoring_client: Total run time: 0:01:40
23/05/02 17:48:51 INFO h2o_mlops_scoring_client: Scoring run time: 0:01:35
23/05/02 17:48:51 INFO h2o_mlops_scoring_client: Stopping Spark context
- Submit and view feedback for this page
- Send feedback about H2O MLOps to cloud-feedback@h2o.ai