Batch Evaluation Script
This script demonstrates how to use concurrent calls with the Patronus API to parallelize evaluation requests. We provide an example with a pandas DataFrame, but you can adapt the script to be used with any dataset to execute batch evaluations in a shorter amount of time.
For more information on running concurrent calls in Python, take a look at the docs here.
import time
import requests
import pandas as pd
import numpy as np
PATRONUS_API_KEY = "TODO"
run_id = "run-at-" + time.strftime("%Y%m%d-%H%M%S")
df = pd.read_csv("TODO")
headers = {
"Content-Type": "application/json",
"X-API-KEY": PATRONUS_API_KEY,
}
# Send request
def send_request(data):
try:
response = requests.post(
"https://api.patronus.ai/v1/evaluate",
headers=headers,
json=data,
)
response.raise_for_status()
except requests.exceptions.HTTPError as e:
print(f"HTTP error occurred: {e}")
print(f"Response content: {response.content}")
except requests.exceptions.RequestException as e:
print(f"Other error occurred: {e}")
return response
def process_row_with_evaluator(row_id, row, evaluator_config, max_retries=3):
question = str(row["question"])
text = str(row["generated_text"])
data = {
"evaluators": [evaluator_config],
"evaluated_model_input": question,
"evaluated_model_output": text,
"app": "TODO",
"tags": {"run-id": run_id},
"capture": "all", # Set to "all" to capture all logs
"explain": False, # Set to True to generate explanations
}
reference_name = (
evaluator_config["profile_name"]
if "profile_name" in evaluator_config
else evaluator_config["evaluator"]
)
attempt = 0
while attempt < max_retries:
attempt += 1
try:
response = send_request(data)
if (
response.status_code == 200
and response.json()["results"][0]["status"] == "success"
):
result = response.json()["results"][0]
passed = bool(result["evaluation_result"]["pass"])
# print(
# f"Row {row_id + 1} with {reference_name} completed on attempt {attempt}",
# )
return row_id, reference_name, passed
else:
print(
f"Row {row_id + 1} with {reference_name} did NOT complete on {attempt} of {max_retries}",
)
print(f"Received status code {response.status_code}")
print(f"Response body: {response.json()}")
print(f"Request data: {data}")
except Exception as e:
print(
f"Row {row_id + 1} with {reference_name} generated an exception on attempt {attempt} of {max_retries}",
)
print(f"Exception message: {e}")
print(f"Request data: {data}")
time.sleep(2)
print(f"All {max_retries} failed for row {row_id + 1}")
return row_id, None, None
from concurrent.futures import ThreadPoolExecutor, as_completed
NUM_WORKERS = 10 # Number of workers to use
NUM_RETRIES = 3 # Number of retries for each row
TIMEOUT = 10 # Timeout for each request
evaluator_configs = [
{
"evaluator": "custom",
"profile_name": "TODO",
},
]
print(f"Number of rows: {df.shape[0]}")
num_completed = 0
with ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor:
future_to_row = {
executor.submit(
process_row_with_evaluator, row_id, row, evaluator_config, NUM_RETRIES
): (
row_id,
evaluator_config,
)
for row_id, row in df.iterrows()
for evaluator_config in evaluator_configs
}
for future in as_completed(future_to_row):
row_id, evaluator_config = future_to_row[future]
try:
_, reference_name, passed = future.result(timeout=TIMEOUT)
if reference_name is not None and passed is not None:
df.at[row_id, reference_name] = passed
print(
f"Completed future result from row {row_id + 1} with {reference_name}"
)
num_completed += 1
except Exception as e:
print(f"Issue with future result from row {row_id + 1}")
print(f"Exception: {e}")
if num_completed % (10 * len(evaluator_configs)) == 0:
print(f"Completed {num_completed // len(evaluator_configs)} rows")
Updated about 2 months ago