Percival
Langchain Integration
Colab Notebook
pip install patronus
pip install pydantic langchain_openai langgraph langchain_core
pip install openinference-instrumentation-langchain
pip install opentelemetry-instrumentation-threading
pip install opentelemetry-instrumentation-asyncio
Once installed, try the following example:
from typing import Literal, Dict, List, Any
from langchain_core.messages import HumanMessage, AIMessage, BaseMessage, FunctionMessage
from langchain_openai import ChatOpenAI
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, StateGraph
from langchain_core.tools import tool
from pydantic import BaseModel, Field
import json
from openinference.instrumentation.langchain import LangChainInstrumentor
from opentelemetry.instrumentation.threading import ThreadingInstrumentor
from opentelemetry.instrumentation.asyncio import AsyncioInstrumentor
import patronus
patronus.init(integrations=[LangChainInstrumentor(), ThreadingInstrumentor(), AsyncioInstrumentor()])
import os
from google.colab import userdata
os.environ["OPENAI_API_KEY"] = userdata.get("OPENAI_API_KEY")
@tool
def get_weather(city: str) -> str:
"""Get the current weather in a given city.
Args:
city: The name of the city to get weather for.
Returns:
A string describing the current weather in the city.
"""
return f"The weather in {city} is sunny"
class MessagesState(BaseModel):
"""State for the manager-weather agent workflow."""
messages: List[BaseMessage] = Field(default_factory=list)
current_agent: str = Field(default="manager")
manager_model = ChatOpenAI(temperature=0, model="gpt-4o")
weather_model = ChatOpenAI(temperature=0, model="gpt-4o")
tools = [get_weather]
tools_dict = {tool.name: tool for tool in tools}
weather_model_with_tools = weather_model.bind_tools(tools)
def manager_agent(state: MessagesState) -> Dict[str, Any]:
messages = state.messages # Access as attribute
# Get response from the manager model
response = manager_model.invoke(messages)
# Check if the manager wants to use the weather agent
manager_text = response.content.lower()
if "weather" in manager_text and "in" in manager_text:
# Delegate to weather agent
return {"messages": messages + [AIMessage(content=f"I'll check the weather for you. Delegating to weather agent.")],
"current_agent": "weather"}
return {"messages": messages + [response], "current_agent": "manager"}
# Define the weather agent node using a simpler approach
def weather_agent(state: MessagesState) -> Dict[str, Any]:
messages = state.messages # Access as attribute
human_queries = [msg for msg in messages if isinstance(msg, HumanMessage)]
if not human_queries:
return {"messages": messages + [AIMessage(content="I need a query about weather.")],
"current_agent": "manager"}
query = human_queries[-1].content
try:
weather_prompt = f"Extract the city name from this query and provide the weather: '{query}'"
city_match = None
# Common cities that might be mentioned
common_cities = ["Paris", "London", "New York", "Tokyo", "Berlin", "Rome", "Madrid"]
for city in common_cities:
if city.lower() in query.lower():
city_match = city
break
if city_match:
weather_result = get_weather.invoke(city_match)
weather_response = f"I checked the weather for {city_match}. {weather_result}"
else:
if "weather in " in query.lower():
parts = query.lower().split("weather in ")
if len(parts) > 1:
city_match = parts[1].strip().split()[0].capitalize()
weather_result = get_weather.invoke(city_match)
weather_response = f"I checked the weather for {city_match}. {weather_result}"
else:
weather_response = "I couldn't identify a specific city in your query."
else:
weather_response = "I couldn't identify a specific city in your query."
return {
"messages": messages + [AIMessage(content=f"Weather Agent: {weather_response}")],
"current_agent": "manager"
}
except Exception as e:
error_message = f"I encountered an error while checking the weather: {str(e)}"
return {
"messages": messages + [AIMessage(content=f"Weather Agent: {error_message}")],
"current_agent": "manager"
}
def router(state: MessagesState) -> Literal["manager", "weather", END]:
if len(state.messages) > 10: # Prevent infinite loops
return END
# Route based on current_agent
if state.current_agent == "weather":
return "weather"
elif state.current_agent == "manager":
# Check if the last message is from the manager and indicates completion
if len(state.messages) > 0 and isinstance(state.messages[-1], AIMessage):
if not "delegating to weather agent" in state.messages[-1].content.lower():
return END
return "manager"
workflow = StateGraph(MessagesState)
workflow.add_node("manager", manager_agent)
workflow.add_node("weather", weather_agent)
workflow.set_entry_point("manager")
workflow.add_conditional_edges("manager", router)
workflow.add_conditional_edges("weather", router)
checkpointer = MemorySaver()
app = workflow.compile(checkpointer=checkpointer)
def run_workflow(query: str):
initial_state = MessagesState(
messages=[HumanMessage(content=query)],
current_agent="manager"
)
config = {"configurable": {"thread_id": "weather_demo_thread"}}
final_state = app.invoke(initial_state, config=config)
for message in final_state["messages"]:
if isinstance(message, HumanMessage):
print(f"Human: {message.content}")
elif isinstance(message, AIMessage):
print(f"AI: {message.content}")
else:
print(f"Other: {message.content}")
return final_state
@patronus.traced()
def main():
final_state = run_workflow("What is the weather in Paris?")
return final_state
main()