Skip to main content
Version: Next

Scoring with S3

Use S3 as source and sink for MLOps batch scoring.

Prerequisites

Setup

  • Install the h2o_mlops_scoring_client with pip.
  • Create a conf folder for default configuration files. In the conf folder, create a spark-defaults.conf file with the following contents (plus whatever method of S3 authentication chosen - for demo purposes access and secret keys method is shown here):
spark.hadoop.fs.s3a.access.key <fill in>
spark.hadoop.fs.s3a.secret.key <fill in>
spark.jars.packages org.apache.hadoop:hadoop-aws:3.3.4
spark.hadoop.fs.s3a.committer.name partitioned

Notes

  • If running locally, the number of cores used (and thus parallel processes) can be overridden with:
num_cores = 10
h2o_mlops_scoring_client.spark_master = f"local[{num_cores}]"

Example Usage

import h2o_mlops_scoring_client
import os

Point the scorer to the conf directory you want to use.

h2o_mlops_scoring_client.spark_conf_dir = os.path.expanduser("~/.h2o_mlops_scoring_client/s3-conf")

Choose the MLOps scoring endpoint.

MLOPS_ENDPOINT_URL = "https://model.internal.dedicated.h2o.ai/f325d002-3c3f-4283-9585-1569afc5dd12/model/score"

Set the S3 folders or file (using the s3a URI) to use along with a unique ID column used to identify each score. Note the source can be either a folder or file while the sink must be a folder.

ID_COLUMN = "ID"
SOURCE_DATA = "s3a://h2o-joeg/input/BNPParibas.csv"
SINK_LOCATION = "s3a://h2o-joeg/output/"

Set the source and sink file type (here we demonstrate CSV and Parquet).

SOURCE_FORMAT = h2o_mlops_scoring_client.Format.CSV
SINK_FORMAT = h2o_mlops_scoring_client.Format.PARQUET

Set the sink write mode. Look at the WriteMode value to see its behavior.

h2o_mlops_scoring_client.WriteMode.OVERWRITE.value
'Overwrite existing files'
SINK_WRITE_MODE = h2o_mlops_scoring_client.WriteMode.OVERWRITE

If the file count of the source is small enough, you will want to preprocess the table into partitions to take advantage of parallel scoring. The number of partitions should equal the number of cores times 3. If the file count is larger than the number of cores, repartitioning may slow down scoring, as each individual file will already count as a partition.

def preprocess(spark_df):
return spark_df.repartition(30)

And now we score.

h2o_mlops_scoring_client.score_source_sink(
mlops_endpoint_url=MLOPS_ENDPOINT_URL,
id_column=ID_COLUMN,
source_data=SOURCE_DATA,
source_format=SOURCE_FORMAT,
sink_location=SINK_LOCATION,
sink_format=SINK_FORMAT,
sink_write_mode=SINK_WRITE_MODE,
preprocess_method=preprocess,
)
23/03/16 10:48:42 INFO h2o_mlops_scoring_client: Starting Spark context


Default S3 Spark configuration applied.


23/03/16 10:48:43 WARN Utils: Your hostname, M16Max-100638.local resolves to a loopback address: 127.0.0.1; using 192.168.1.8 instead (on interface en0)
23/03/16 10:48:43 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /Users/jgranados/.ivy2/cache
The jars for the packages stored in: /Users/jgranados/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-66cff7cf-abdd-43e1-9b2d-6ebb3b3327d7;1.0
confs: [default]


:: loading settings :: url = jar:file:/Users/jgranados/miniconda3/envs/h2o/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


found org.apache.hadoop#hadoop-aws;3.3.4 in central
found com.amazonaws#aws-java-sdk-bundle;1.12.262 in central
found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
:: resolution report :: resolve 135ms :: artifacts dl 12ms
:: modules in use:
com.amazonaws#aws-java-sdk-bundle;1.12.262 from central in [default]
org.apache.hadoop#hadoop-aws;3.3.4 from central in [default]
org.wildfly.openssl#wildfly-openssl;1.0.7.Final from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 3 | 0 | 0 | 0 || 3 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-66cff7cf-abdd-43e1-9b2d-6ebb3b3327d7
confs: [default]
0 artifacts copied, 3 already retrieved (0kB/7ms)
23/03/16 10:48:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/03/16 10:48:44 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/03/16 10:48:46 INFO h2o_mlops_scoring_client: Connecting to H2O.ai MLOps scorer at 'https://model.cloud-qa.h2o.ai/c0dcce87-ff7e-4c1b-8409-a015781c82b8/model/score'
23/03/16 10:48:46 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
23/03/16 10:49:09 INFO h2o_mlops_scoring_client: Applying preprocess method
23/03/16 10:49:09 INFO h2o_mlops_scoring_client: Starting scoring from 's3a://h2o-joeg/input/BNPParibas.csv' to 's3a://h2o-joeg/output/'
23/03/16 10:49:10 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
23/03/16 10:51:47 INFO h2o_mlops_scoring_client: Scoring complete
23/03/16 10:51:47 INFO h2o_mlops_scoring_client: Total run time: 0:03:05
23/03/16 10:51:47 INFO h2o_mlops_scoring_client: Scoring run time: 0:02:38
23/03/16 10:51:47 INFO h2o_mlops_scoring_client: Stopping Spark context

Feedback