Skip to content

Pump data

This example demonstrates how you can get information about all deployments in a specified project and start sending data to those deployments for scoring using the payload of their sample requests. This uses asynchronous scoring calls, which makes it a good tool for testing a large number of deployments simultaneously.

Before you begin

  • Install aiohttp
  • You will need the values for the following constants in order to successfully carry out the task. Contact your administrator to obtain deployment-specific values.
Value Description
MLOPS_API_URL Usually: Defines the URL for the MLOps Gateway component.
TOKEN_ENDPOINT_URL https://mlops.keycloak.domain/auth/realms/[fill-in-realm-name]/protocol/openid-connect/token Defines the token endpoint URL of the Identity Provider. This uses Keycloak as the Identity Provider. Keycloak Realm should be provided.
REFRESH_TOKEN <your-refresh-token> Defines the user's refresh token.
CLIENT_ID <your-client-id> Sets the client id for authentication. This is the client you will be using to connect to MLOps.
PROJECT_NAME StandardFlow Defines a project name that the script will be using. This example uses the project created in Driverless AI example.
SEND_INTERVAL 2.0 Defines the time interval to send requests. This example sends sample requests every 2 seconds.
MAX_CONSECUTIVE_ERRORS 5 Stops sending requests to an endpoint after the defined number of errors. This example stops sending requests after 5 consecutive errors.
CONNECTION_LIMIT 100 Defines the maximum open TCP connections. This example defines a connection limit of 100.

The following steps demonstrate how you can use the MLOps Python client to send data to the deployments of a specified project for scoring.

  1. Download the file.

  2. Change the values of the following constants in your file as given in the preceding data table.
        ### Constants
        ### Constants
        MLOPS_API_URL = ""
        PROJECT_NAME = "PumpDataExample"
        SEND_INTERVAL = "2.0
        CONNECTION_LIMIT = 100
  3. Run the file.

  4. Navigate to MLOps and click the project name StandardFlow under Projects. Then, click the name of the project to open the Deployment Overview side panel, and then click view monitoring to see the monitoring details.



    For more information about deployments in MLOps, see Understanding deployments in MLOps.

  5. Under the Health panel, you will be able to see the predictions made for the scoring data sent by the script.


Example walkthrough

This section provides a walkthrough of the file.

  1. Include the following _get_project_id function to get the project id of the defined project name.
    def _get_project_id(projects: List[mlops.StorageProject]) -> str:
    """Gets the ID of the selected project."""
    for p in projects:
        if p.display_name == PROJECT_NAME:
        raise LookupError("Requested project not found")
  2. Include the _get_sample_payload function to get a sample payload from the sample request deployment endpoint.
    async def _get_sample_payload(
        sample_request_url: str, deployment_id: str, http_session: aiohttp.ClientSession
    ) -> Dict[str, dict]:
        """Gets sample request payload from the sample request deployment endpoint."""
        async with http_session.get(sample_request_url) as r:
            sample_payload = await r.json()
        return {deployment_id: sample_payload}
  3. Include the get_deployment_statuses function to get the deployment statuses of the project. This first gets all the projects of the user and then gets the deployment statuses of the project the user has specified in the script.
    def get_deployment_statuses(client: mlops.Client) -> List[mlops.DeployDeploymentStatus]:
        """Gets deployment metada."""
        # Getting all user's projects.
        projects: mlops.StorageListProjectsResponse =
        prj_id = _get_project_id(projects.project)
        # Getting all deployment statuses for the selected project.
        statuses: mlops.DeployListDeploymentStatusesResponse = (
        return statuses.deployment_status
  4. Include the get_sample_payloads function to get sample payloads from a collection of deployments metadata.
    async def get_sample_payloads(
        deployment_statuses: List[mlops.DeployDeploymentStatus],
        http_session: aiohttp.ClientSession,
    ) -> dict:
        """Gets sample payloads from a collection of deployments metadata."""
        requests_futures = []
        for ds in deployment_statuses:
            sample_payload_awaitable = _get_sample_payload(
                ds.scorer.sample_request.url, ds.deployment_id, http_session
            sample_payload_future = asyncio.ensure_future(sample_payload_awaitable)
        sample_payloads_nested = await asyncio.gather(*requests_futures)
        sample_payloads = {}
        for payload in sample_payloads_nested:
        return sample_payloads
  5. Include the send_sample_request function to continuously sends sample requests to the specified scoring URL.
    async def send_sample_request(
        scoring_url: str, payload: dict, http_session: aiohttp.ClientSession
    ) -> None:
        """Continously sends sample requests to the specified scoring URL."""
        error_count = 0
        while error_count < MAX_CONSECUTIVE_ERRORS:
            async with, json=payload) as r:
                if r.status != 200:
                    error_count += 1
                        f"Failed sending data to: {scoring_url} with status: {r.status}, response: {await r.text()}"
            await asyncio.sleep(SEND_INTERVAL)
        print(f"Stopped sending requests to: {scoring_url}")
  6. In the main function,

    1. Set up the TCP connection limit.
      conn = aiohttp.TCPConnector(limit=CONNECTION_LIMIT)
    2. Set up the token provider using an existing refresh token.

    3. Set up the MLOps client.

    4. Get the deployment metadata by calling the get_deployment_statuses function.
      deployment_statuses = get_deployment_statuses(mlops_client)
    5. Include the client session.
      async with aiohttp.ClientSession(connector=conn) as sessions:
          sample_payloads = await get_sample_payloads(deployment_statuses, sessions)
          sample_requests_futures = []
          for ds in deployment_statuses:
              send_request_awaitable = send_sample_request(
                  ds.scorer.score.url, sample_payloads[ds.deployment_id], sessions
              send_request_future = asyncio.ensure_future(send_request_awaitable)
          await asyncio.gather(*sample_requests_futures)


      Although having a single client session is uncommon, this context manager is actually a connection pool and should be treated as such. Having multiple client sessions here can cause race condition issues. For more information, see Client session in aiohttp.

  7. Finally, include the below code to run the main function.
    if __name__ == "__main__":
    loop = asyncio.get_event_loop()