Our docs got a refresh! Check out the new content and improved navigation. For detailed API reference see our Python SDK docs and TypeScript SDK.
Description
PercivalIntegrations

Langchain Integration

Colab Notebook

pip install patronus
pip install pydantic langchain_openai langgraph langchain_core
pip install openinference-instrumentation-langchain
pip install opentelemetry-instrumentation-threading
pip install opentelemetry-instrumentation-asyncio

Once installed, try the following example:

from typing import Literal, Dict, List, Any
from langchain_core.messages import HumanMessage, AIMessage, BaseMessage, FunctionMessage
from langchain_openai import ChatOpenAI
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, StateGraph
from langchain_core.tools import tool
from pydantic import BaseModel, Field
import json
 
from openinference.instrumentation.langchain import LangChainInstrumentor
from opentelemetry.instrumentation.threading import ThreadingInstrumentor
from opentelemetry.instrumentation.asyncio import AsyncioInstrumentor
 
import patronus
patronus.init(integrations=[LangChainInstrumentor(), ThreadingInstrumentor(), AsyncioInstrumentor()])
 
import os
from google.colab import userdata
 
os.environ["OPENAI_API_KEY"] = userdata.get("OPENAI_API_KEY")
 
@tool
def get_weather(city: str) -> str:
    """Get the current weather in a given city.
 
    Args:
        city: The name of the city to get weather for.
 
    Returns:
        A string describing the current weather in the city.
    """
    return f"The weather in {city} is sunny"
 
class MessagesState(BaseModel):
    """State for the manager-weather agent workflow."""
    messages: List[BaseMessage] = Field(default_factory=list)
    current_agent: str = Field(default="manager")
 
manager_model = ChatOpenAI(temperature=0, model="gpt-4o")
weather_model = ChatOpenAI(temperature=0, model="gpt-4o")
 
tools = [get_weather]
tools_dict = {tool.name: tool for tool in tools}
 
weather_model_with_tools = weather_model.bind_tools(tools)
 
def manager_agent(state: MessagesState) -> Dict[str, Any]:
    messages = state.messages  # Access as attribute
    # Get response from the manager model
    response = manager_model.invoke(messages)
 
    # Check if the manager wants to use the weather agent
    manager_text = response.content.lower()
    if "weather" in manager_text and "in" in manager_text:
        # Delegate to weather agent
        return {"messages": messages + [AIMessage(content=f"I'll check the weather for you. Delegating to weather agent.")],
                "current_agent": "weather"}
 
    return {"messages": messages + [response], "current_agent": "manager"}
 
# Define the weather agent node using a simpler approach
def weather_agent(state: MessagesState) -> Dict[str, Any]:
    messages = state.messages  # Access as attribute
    human_queries = [msg for msg in messages if isinstance(msg, HumanMessage)]
    if not human_queries:
        return {"messages": messages + [AIMessage(content="I need a query about weather.")],
                "current_agent": "manager"}
 
    query = human_queries[-1].content
 
    try:
        weather_prompt = f"Extract the city name from this query and provide the weather: '{query}'"
 
        city_match = None
 
        # Common cities that might be mentioned
        common_cities = ["Paris", "London", "New York", "Tokyo", "Berlin", "Rome", "Madrid"]
        for city in common_cities:
            if city.lower() in query.lower():
                city_match = city
                break
 
        if city_match:
            weather_result = get_weather.invoke(city_match)
            weather_response = f"I checked the weather for {city_match}. {weather_result}"
        else:
            if "weather in " in query.lower():
                parts = query.lower().split("weather in ")
                if len(parts) > 1:
                    city_match = parts[1].strip().split()[0].capitalize()
                    weather_result = get_weather.invoke(city_match)
                    weather_response = f"I checked the weather for {city_match}. {weather_result}"
                else:
                    weather_response = "I couldn't identify a specific city in your query."
            else:
                weather_response = "I couldn't identify a specific city in your query."
 
        return {
            "messages": messages + [AIMessage(content=f"Weather Agent: {weather_response}")],
            "current_agent": "manager"
        }
    except Exception as e:
        error_message = f"I encountered an error while checking the weather: {str(e)}"
        return {
            "messages": messages + [AIMessage(content=f"Weather Agent: {error_message}")],
            "current_agent": "manager"
        }
 
def router(state: MessagesState) -> Literal["manager", "weather", END]:
    if len(state.messages) > 10:  # Prevent infinite loops
        return END
 
    # Route based on current_agent
    if state.current_agent == "weather":
        return "weather"
    elif state.current_agent == "manager":
        # Check if the last message is from the manager and indicates completion
        if len(state.messages) > 0 and isinstance(state.messages[-1], AIMessage):
            if not "delegating to weather agent" in state.messages[-1].content.lower():
                return END
 
    return "manager"
 
workflow = StateGraph(MessagesState)
workflow.add_node("manager", manager_agent)
workflow.add_node("weather", weather_agent)
 
workflow.set_entry_point("manager")
workflow.add_conditional_edges("manager", router)
workflow.add_conditional_edges("weather", router)
 
checkpointer = MemorySaver()
app = workflow.compile(checkpointer=checkpointer)
 
def run_workflow(query: str):
    initial_state = MessagesState(
        messages=[HumanMessage(content=query)],
        current_agent="manager"
    )
 
    config = {"configurable": {"thread_id": "weather_demo_thread"}}
    final_state = app.invoke(initial_state, config=config)
 
    for message in final_state["messages"]:
        if isinstance(message, HumanMessage):
            print(f"Human: {message.content}")
        elif isinstance(message, AIMessage):
            print(f"AI: {message.content}")
        else:
            print(f"Other: {message.content}")
 
    return final_state
 
@patronus.traced()
def main():
  final_state = run_workflow("What is the weather in Paris?")
  return final_state
 
main()

On this page

No Headings