Skip to content

Shared code

This page contains the shared code sections commonly used by the MLOps Python client examples.

  • Helper function

        def deployment_should_become_healthy(
            mlops_client: mlops.Client, deployment_id: str, max_wait_time: int = MAX_WAIT_TIME
        ):
            """Waits for the deployment to become healthy helper function."""
            svc = mlops_client.deployer.deployment_status
            status: mlops.DeployDeploymentStatus
            deadline = time.monotonic() + max_wait_time
    
            while True:
                time.sleep(REFRESH_STATUS_INTERVAL)
                status = svc.get_deployment_status(
                    mlops.DeployGetDeploymentStatusRequest(deployment_id=deployment_id)
                ).deployment_status
                if (
                    status.state == mlops.DeployDeploymentState.HEALTHY
                    or time.monotonic() > deadline
                ):
                    break
    
            return status
    
  • Convert the extracted metadata into storage compatible value objects.

        def convert_metadata(in_: mlops.IngestMetadata) -> mlops.StorageMetadata:
        """Converts extracted metadata into Storage compatible value objects."""
        values = {}
        for k, v in in_.values.items():
            i: mlops.IngestMetadataValue = v
    
            o = mlops.StorageValue(
                bool_value=i.bool_value,
                double_value=i.double_value,
                duration_value=i.duration_value,
                int64_value=i.int64_value,
                string_value=i.string_value,
                json_value=i.json_value,
                timestamp_value=i.timestamp_value,
            )
    
            values[k] = o
    
        return mlops.StorageMetadata(values=values)
    
  • Set up the token provider using an existing refresh token.

        mlops_token_provider = mlops.TokenProvider(
            refresh_token=REFRESH_TOKEN,
            client_id=CLIENT_ID,
            token_endpoint_url=TOKEN_ENDPOINT_URL,
        )
    
  • Set up the token provider using an existing refresh token and client secret.

        mlops_token_provider = mlops.TokenProvider(
            refresh_token=REFRESH_TOKEN,
            client_id=CLIENT_ID,
            token_endpoint_url=TOKEN_ENDPOINT_URL,
            client_secret=CLIENT_SECRET
        )
    
  • Set up the MLOps client.

        mlops_client = mlops.Client(
            gateway_url=MLOPS_API_URL,
            token_provider=mlops_token_provider,
        )
    
  • Create a project in MLOps and create an artifact in MLOps storage.

        # Creating a project in MLOps.
        prj: mlops.StorageProject = mlops_client.storage.project.create_project(
            mlops.StorageCreateProjectRequest(
                mlops.StorageProject(display_name=PROJECT_NAME)
            )
        ).project
    
        # Creating an artifact in MLOps Storage.
        artifact: mlops.StorageArtifact = mlops_client.storage.artifact.create_artifact(
            mlops.StorageCreateArtifactRequest(
                mlops.StorageArtifact(
                    entity_id=prj.id, mime_type=mimetypes.types_map[".zip"]
                )
            )
        ).artifact
    
  •     # Analyzing the MLflow zip file.
        ingestion: mlops.IngestMetadata = mlops_client.ingest.model.create_model_ingestion(
            mlops.IngestModelIngestion(artifact_id=artifact.id)
        ).ingestion
    
        model_metadata = convert_metadata(ingestion.model_metadata)
        model_params = mlops.StorageExperimentParameters(
            target_column=ingestion.model_parameters.target_column
        )
    
        # Creating an experiment from the MLflow zip file.
        experiment: mlops.StorageExperiment = (
            mlops_client.storage.experiment.create_experiment(
                mlops.StorageCreateExperimentRequest(
                    project_id=prj.id,
                    experiment=mlops.StorageExperiment(
                        display_name=EXPERIMENT_NAME,
                        metadata=model_metadata,
                        parameters=model_params,
                    ),
                )
            ).experiment
        )
    
        # Linking the artifact to the experiment.
        artifact.entity_id = experiment.id
        artifact.type = ingestion.artifact_type
    
        mlops_client.storage.artifact.update_artifact(
            mlops.StorageUpdateArtifactRequest(
                artifact=artifact, update_mask="type,entityId"
            )
        )
    
  • Fetch available deployment environments and search for the ID of the selected deployment environment for the Driverless AI client.

        # Fetching available deployment environments.
        deployment_envs: mlops.StorageListDeploymentEnvironmentsResponse = (
            mlops_client.storage.deployment_environment.list_deployment_environments(
                mlops.StorageListDeploymentEnvironmentsRequest(prj.key)
            )
        )
    
        # Looking for the ID of the selected deployment environment.
        for de in deployment_envs.deployment_environment:
            if de.display_name == DEPLOYMENT_ENVIRONMENT:
                deployment_env_id = de.id
                break
        else:
            raise LookupError("Requested deployment environment not found")
    
  • Fetch available deployment environments and search for the ID of the selected deployment environment for the MLOps client.

        # Fetching available deployment environments.
        deployment_envs: mlops.StorageListDeploymentEnvironmentsResponse = (
            mlops_client.storage.deployment_environment.list_deployment_environments(
                mlops.StorageListDeploymentEnvironmentsRequest(prj.id)
            )
        )
    
        # Looking for the ID of the selected deployment environment.
        for de in deployment_envs.deployment_environment:
            if de.display_name == DEPLOYMENT_ENVIRONMENT:
                deployment_env_id = de.id
                break
        else:
            raise LookupError("Requested deployment environment not found")
    
  • Customize the composition of the deployment and specify the deployment as a single deployment.

        # Customize the composition of the deployment
        composition = mlops.DeployDeploymentComposition(
            experiment_id=experiment.id,
            artifact_id=artifact.id,
            deployable_artifact_type_name="python/mlflow.zip",
            artifact_processor_name="unzip_processor",
            runtime_name="python-scorer_mlflow_38",
        )
    
        # Specify the deployment as a single deployment
        deployment = mlops.DeployDeployment(
            project_id=prj.id,
            deployment_environment_id=deployment_env_id,
            single_deployment=mlops.DeploySingleDeployment(
                deployment_composition=composition
            ),
        )
    
  • Create a deployment and wait for the deployment to become healthy.

        # Create the deployment (deploy the model).
        deployed_deployment = mlops_client.deployer.deployment.create_deployment(
            mlops.DeployCreateDeploymentRequest(deployment=to_deploy)
        ).deployment
    
        # Waiting for the deployment to become healthy.
        deployment_status = deployment_should_become_healthy(
            mlops_client, deployed_deployment.deployment.id
        )
        if deployment_status.state == mlops.DeployDeploymentState.HEALTHY:
            print("Deployment has become healthy")
        else:
            print(
                f"Deployment still not healthy after max wait time with state: {deployment_status.state}"
            )