Batch Evaluation Script

This script demonstrates how to use concurrent calls with the Patronus API to parallelize evaluation requests. We provide an example with a pandas DataFrame, but you can adapt the script to be used with any dataset to execute batch evaluations in a shorter amount of time.

For more information on running concurrent calls in Python, take a look at the docs here.

import time
import requests
import pandas as pd
import numpy as np

PATRONUS_API_KEY = "TODO"

run_id = "run-at-" + time.strftime("%Y%m%d-%H%M%S")

df = pd.read_csv("TODO")

headers = {
    "Content-Type": "application/json",
    "X-API-KEY": PATRONUS_API_KEY,
}


# Send request
def send_request(data):
    try:
        response = requests.post(
            "https://api.patronus.ai/v1/evaluate",
            headers=headers,
            json=data,
        )
        response.raise_for_status()
    except requests.exceptions.HTTPError as e:
        print(f"HTTP error occurred: {e}")
        print(f"Response content: {response.content}")
    except requests.exceptions.RequestException as e:
        print(f"Other error occurred: {e}")
    return response


def process_row_with_evaluator(row_id, row, evaluator_config, max_retries=3):
    question = str(row["question"])
    text = str(row["generated_text"])

    data = {
        "evaluators": [evaluator_config],
        "evaluated_model_input": question,
        "evaluated_model_output": text,
        "app": "TODO",
        "tags": {"run-id": run_id},
        "capture": "all",  # Set to "all" to capture all logs
        "explain": False,  # Set to True to generate explanations
    }

    reference_name = (
        evaluator_config["profile_name"]
        if "profile_name" in evaluator_config
        else evaluator_config["evaluator"]
    )

    attempt = 0
    while attempt < max_retries:
        attempt += 1
        try:
            response = send_request(data)
            if (
                response.status_code == 200
                and response.json()["results"][0]["status"] == "success"
            ):
                result = response.json()["results"][0]
                passed = bool(result["evaluation_result"]["pass"])
                # print(
                #     f"Row {row_id + 1} with {reference_name} completed on attempt {attempt}",
                # )
                return row_id, reference_name, passed
            else:
                print(
                    f"Row {row_id + 1} with {reference_name} did NOT complete on {attempt} of {max_retries}",
                )
                print(f"Received status code {response.status_code}")
                print(f"Response body: {response.json()}")
                print(f"Request data: {data}")

        except Exception as e:
            print(
                f"Row {row_id + 1} with {reference_name} generated an exception on attempt {attempt} of {max_retries}",
            )
            print(f"Exception message: {e}")
            print(f"Request data: {data}")

        time.sleep(2)

    print(f"All {max_retries} failed for row {row_id + 1}")
    return row_id, None, None


from concurrent.futures import ThreadPoolExecutor, as_completed

NUM_WORKERS = 10  # Number of workers to use
NUM_RETRIES = 3  # Number of retries for each row
TIMEOUT = 10  # Timeout for each request

evaluator_configs = [
    {
        "evaluator": "custom",
        "profile_name": "TODO",
    },
]

print(f"Number of rows: {df.shape[0]}")

num_completed = 0

with ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor:
    future_to_row = {
        executor.submit(
            process_row_with_evaluator, row_id, row, evaluator_config, NUM_RETRIES
        ): (
            row_id,
            evaluator_config,
        )
        for row_id, row in df.iterrows()
        for evaluator_config in evaluator_configs
    }

    for future in as_completed(future_to_row):
        row_id, evaluator_config = future_to_row[future]
        try:
            _, reference_name, passed = future.result(timeout=TIMEOUT)
            if reference_name is not None and passed is not None:
                df.at[row_id, reference_name] = passed
            print(
                f"Completed future result from row {row_id + 1} with {reference_name}"
            )
            num_completed += 1
        except Exception as e:
            print(f"Issue with future result from row {row_id + 1}")
            print(f"Exception: {e}")

        if num_completed % (10 * len(evaluator_configs)) == 0:
            print(f"Completed {num_completed // len(evaluator_configs)} rows")