Shared code in examples
This page contains the shared code sections commonly used by the MLOps Python client examples.
Helper function
def deployment_should_become_healthy(
mlops_client: mlops.Client, deployment_id: str, max_wait_time: int = MAX_WAIT_TIME
):
"""Waits for the deployment to become healthy helper function."""
svc = mlops_client.deployer.deployment_status
status: mlops.DeployDeploymentStatus
deadline = time.monotonic() + max_wait_time
while True:
time.sleep(REFRESH_STATUS_INTERVAL)
status = svc.get_deployment_status(
mlops.DeployGetDeploymentStatusRequest(deployment_id=deployment_id)
).deployment_status
if (
status.state == mlops.DeployDeploymentState.HEALTHY
or time.monotonic() > deadline
):
break
return statusConvert the extracted metadata into storage compatible value objects.
def convert_metadata(in_: mlops.IngestMetadata) -> mlops.StorageMetadata:
"""Converts extracted metadata into Storage compatible value objects."""
values = {}
for k, v in in_.values.items():
i: mlops.IngestMetadataValue = v
o = mlops.StorageValue(
bool_value=i.bool_value,
double_value=i.double_value,
duration_value=i.duration_value,
int64_value=i.int64_value,
string_value=i.string_value,
json_value=i.json_value,
timestamp_value=i.timestamp_value,
)
values[k] = o
return mlops.StorageMetadata(values=values)
Set up the token provider using an existing refresh token.
mlops_token_provider = mlops.TokenProvider(
refresh_token=REFRESH_TOKEN,
client_id=CLIENT_ID,
token_endpoint_url=TOKEN_ENDPOINT_URL,
)Set up the token provider using an existing refresh token and client secret.
mlops_token_provider = mlops.TokenProvider(
refresh_token=REFRESH_TOKEN,
client_id=CLIENT_ID,
token_endpoint_url=TOKEN_ENDPOINT_URL,
client_secret=CLIENT_SECRET
)Set up the MLOps client.
mlops_client = mlops.Client(
gateway_url=MLOPS_API_URL,
token_provider=mlops_token_provider,
)Create a project in MLOps and create an artifact in MLOps storage.
# Creating a project in MLOps.
prj: mlops.StorageProject = mlops_client.storage.project.create_project(
mlops.StorageCreateProjectRequest(
mlops.StorageProject(display_name=PROJECT_NAME)
)
).project
# Creating an artifact in MLOps Storage.
artifact: mlops.StorageArtifact = mlops_client.storage.artifact.create_artifact(
mlops.StorageCreateArtifactRequest(
mlops.StorageArtifact(
entity_id=prj.id, mime_type=mimetypes.types_map[".zip"]
)
)
).artifactAnalyze the MLflow .zip file and create an experiment from it. Then link the artifact to the experiment.
# Analyzing the MLflow zip file.
ingestion: mlops.IngestMetadata = mlops_client.ingest.model.create_model_ingestion(
mlops.IngestModelIngestion(artifact_id=artifact.id)
).ingestion
model_metadata = convert_metadata(ingestion.model_metadata)
model_params = mlops.StorageExperimentParameters(
target_column=ingestion.model_parameters.target_column
)
# Creating an experiment from the MLflow zip file.
experiment: mlops.StorageExperiment = (
mlops_client.storage.experiment.create_experiment(
mlops.StorageCreateExperimentRequest(
project_id=prj.id,
experiment=mlops.StorageExperiment(
display_name=EXPERIMENT_NAME,
metadata=model_metadata,
parameters=model_params,
),
)
).experiment
)
# Linking the artifact to the experiment.
artifact.entity_id = experiment.id
artifact.type = ingestion.artifact_type
mlops_client.storage.artifact.update_artifact(
mlops.StorageUpdateArtifactRequest(
artifact=artifact, update_mask="type,entityId"
)
)Fetch available deployment environments and search for the ID of the selected deployment environment for the Driverless AI client.
# Fetching available deployment environments.
deployment_envs: mlops.StorageListDeploymentEnvironmentsResponse = (
mlops_client.storage.deployment_environment.list_deployment_environments(
mlops.StorageListDeploymentEnvironmentsRequest(prj.key)
)
)
# Looking for the ID of the selected deployment environment.
for de in deployment_envs.deployment_environment:
if de.display_name == DEPLOYMENT_ENVIRONMENT:
deployment_env_id = de.id
break
else:
raise LookupError("Requested deployment environment not found")Fetch available deployment environments and search for the ID of the selected deployment environment for the MLOps client.
# Fetching available deployment environments.
deployment_envs: mlops.StorageListDeploymentEnvironmentsResponse = (
mlops_client.storage.deployment_environment.list_deployment_environments(
mlops.StorageListDeploymentEnvironmentsRequest(prj.id)
)
)
# Looking for the ID of the selected deployment environment.
for de in deployment_envs.deployment_environment:
if de.display_name == DEPLOYMENT_ENVIRONMENT:
deployment_env_id = de.id
break
else:
raise LookupError("Requested deployment environment not found")Customize the composition of the deployment and specify the deployment as a single deployment.
# Customize the composition of the deployment
composition = mlops.DeployDeploymentComposition(
experiment_id=experiment.id,
artifact_id=artifact.id,
deployable_artifact_type_name="python/mlflow.zip",
artifact_processor_name="unzip_processor",
runtime_name="python-scorer_mlflow_38",
)
# Specify the deployment as a single deployment
deployment = mlops.DeployDeployment(
project_id=prj.id,
deployment_environment_id=deployment_env_id,
single_deployment=mlops.DeploySingleDeployment(
deployment_composition=composition
),
)
Create a deployment and wait for the deployment to become healthy.
# Create the deployment (deploy the model).
deployed_deployment = mlops_client.deployer.deployment.create_deployment(
mlops.DeployCreateDeploymentRequest(deployment=to_deploy)
).deployment
# Waiting for the deployment to become healthy.
deployment_status = deployment_should_become_healthy(
mlops_client, deployed_deployment.deployment.id
)
if deployment_status.state == mlops.DeployDeploymentState.HEALTHY:
print("Deployment has become healthy")
else:
print(
f"Deployment still not healthy after max wait time with state: {deployment_status.state}"
)
Feedback
- Submit and view feedback for this page
- Send feedback about H2O MLOps to cloud-feedback@h2o.ai