Our Python SDK got smarter. We developed a Typscript SDK too. We are updating our SDK code blocks. Python SDKhere.Typscript SDKhere.
Description
Advanced

Batch Evaluation Calls

This script demonstrates how to use concurrent calls with the Patronus API to parallelize evaluation requests. We provide an example with a pandas DataFrame, but you can adapt the script to be used with any dataset to execute batch evaluations in a shorter amount of time.

For more information on running concurrent calls in Python, take a look at the docs here.

import time
import requests
import pandas as pd
import numpy as np
 
PATRONUS_API_KEY = "TODO"
 
run_id = "run-at-" + time.strftime("%Y%m%d-%H%M%S")
 
df = pd.read_csv("TODO")
 
headers = {
    "Content-Type": "application/json",
    "X-API-KEY": PATRONUS_API_KEY,
}
 
 
# Send request
def send_request(data):
    try:
        response = requests.post(
            "https://api.patronus.ai/v1/evaluate",
            headers=headers,
            json=data,
        )
        response.raise_for_status()
    except requests.exceptions.HTTPError as e:
        print(f"HTTP error occurred: {e}")
        print(f"Response content: {response.content}")
    except requests.exceptions.RequestException as e:
        print(f"Other error occurred: {e}")
    return response
 
 
def process_row_with_evaluator(row_id, row, evaluator_config, max_retries=3):
    question = str(row["question"])
    text = str(row["generated_text"])
 
    data = {
        "evaluators": [evaluator_config],
        "evaluated_model_input": question,
        "evaluated_model_output": text,
        "app": "TODO",
        "tags": {"run-id": run_id},
        "capture": "all",  # Set to "all" to capture all logs
        "explain": False,  # Set to True to generate explanations
    }
 
    reference_name = (
        evaluator_config["profile_name"]
        if "profile_name" in evaluator_config
        else evaluator_config["evaluator"]
    )
 
    attempt = 0
    while attempt < max_retries:
        attempt += 1
        try:
            response = send_request(data)
            if (
                response.status_code == 200
                and response.json()["results"][0]["status"] == "success"
            ):
                result = response.json()["results"][0]
                passed = bool(result["evaluation_result"]["pass"])
                # print(
                #     f"Row {row_id + 1} with {reference_name} completed on attempt {attempt}",
                # )
                return row_id, reference_name, passed
            else:
                print(
                    f"Row {row_id + 1} with {reference_name} did NOT complete on {attempt} of {max_retries}",
                )
                print(f"Received status code {response.status_code}")
                print(f"Response body: {response.json()}")
                print(f"Request data: {data}")
 
        except Exception as e:
            print(
                f"Row {row_id + 1} with {reference_name} generated an exception on attempt {attempt} of {max_retries}",
            )
            print(f"Exception message: {e}")
            print(f"Request data: {data}")
 
        time.sleep(2)
 
    print(f"All {max_retries} failed for row {row_id + 1}")
    return row_id, None, None
 
 
from concurrent.futures import ThreadPoolExecutor, as_completed
 
NUM_WORKERS = 10  # Number of workers to use
NUM_RETRIES = 3  # Number of retries for each row
TIMEOUT = 10  # Timeout for each request
 
evaluator_configs = [
    {
        "evaluator": "custom",
        "profile_name": "TODO",
    },
]
 
print(f"Number of rows: {df.shape[0]}")
 
num_completed = 0
 
with ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor:
    future_to_row = {
        executor.submit(
            process_row_with_evaluator, row_id, row, evaluator_config, NUM_RETRIES
        ): (
            row_id,
            evaluator_config,
        )
        for row_id, row in df.iterrows()
        for evaluator_config in evaluator_configs
    }
 
    for future in as_completed(future_to_row):
        row_id, evaluator_config = future_to_row[future]
        try:
            _, reference_name, passed = future.result(timeout=TIMEOUT)
            if reference_name is not None and passed is not None:
                df.at[row_id, reference_name] = passed
            print(
                f"Completed future result from row {row_id + 1} with {reference_name}"
            )
            num_completed += 1
        except Exception as e:
            print(f"Issue with future result from row {row_id + 1}")
            print(f"Exception: {e}")
 
        if num_completed % (10 * len(evaluator_configs)) == 0:
            print(f"Completed {num_completed // len(evaluator_configs)} rows")

On this page

No Headings