First of all, let's see how I set up my tool, model, agent, callback handler and AgentExecutor :
Tool :
from datetime import datetime
from typing import Literal, Annotated
from langchain_core.tools import InjectedToolArg, ToolException
from langchain_core.tools.structured import StructuredTool
from pydantic import BaseModel, Field, PlainSerializer
class HumanInTheLoopToolException(ToolException):
"""HumanIntheLoopToolException
Exception raised when the conversation needs to be rerouted to a human
"""
reason: str
conversation_id: int
def __init__(self, *, reason: str, conversation_id: int) -> None:
self.reason = reason
self.conversation_id = conversation_id
super().__init__("Human in the loop due to: " + reason)
class HumanInTheLoopInput(BaseModel):
"""Input for human_in_the_loop tool"""
reason: str = Field(..., description="Reason for using this tool")
conversation_id: Annotated[
int,
PlainSerializer(int, return_type=int),
InjectedToolArg,
Field(
...,
description="Conversation ID used to retrieve the account ID to "
"book the appointment",
),
]
async def human_in_the_loop_function(reason: str, conversation_id: int) -> None:
"""human_in_the_loop_function
Args:
reason (str): reason for raising a human in the loop
Raises:
HumanInTheLoopToolException: always raised when
using this tool
"""
raise HumanInTheLoopToolException(
reason=reason, conversation_id=conversation_id
)
# The library used for inversion of control here is : /
@inject
def handle_human_in_the_loop_tool_exceptions(
exception: ToolException,
event_bus: EventBus = Provide[IOCContainer.event_bus],
) -> None:
"""handle_human_in_the_loop_tool_exceptions
Handle errors raised by the human_in_the_loop tool. It will
use the Slack Service to advise the advisors and the HITL Slack
channel
Args:
exception (ToolException): the Langchain ToolException
Returns:
None: always returns None to make sur no message is sent
"""
if isinstance(exception, HumanInTheLoopToolException):
event_data: dict[str, Any] = {
"type": "human_in_the_loop",
"conversation_id": exception.conversation_id,
"reason": exception.reason,
}
event_bus.emit("human_in_the_loop", event_data)
return "Answer this : Je passe ma conversation à un humain"
human_in_the_loop = StructuredTool.from_function(
name="human_in_the_loop",
description="""
Censored prompt (but imagine in this case that tool is always
""",
args_schema=HumanInTheLoopInput,
coroutine=human_in_the_loop_function,
handle_tool_error=handle_human_in_the_loop_tool_exceptions,
)
As you can see, I use the recommandation from Langchain v0.3 : /
The model :
from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain_community.callbacks.promptlayer_callback import (
PromptLayerCallbackHandler,
)
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.utils.utils import convert_to_secret_str
from langchain_mistralai import ChatMistralAI
from langchain_openai import AzureChatOpenAI, ChatOpenAI
model = ChatOpenAI(
name=model,
model=model,
temperature=temperature,
callbacks=[],
api_key=convert_to_secret_str(self._openai_api_key), # I won't of course put the API key here, but the model does work properly alone
max_tokens=max_tokens,
)
The agent :
from langchain.agents import AgentExecutor, create_openai_tools_agent
agent = create_openai_tools_agent(
llm=model,
tools=[human_in_the_loop],
prompt=ChatPromptTemplate.from_messages(
[
("system", system_prompt),
MessagesPlaceholder("chat_history"),
MessagesPlaceholder("agent_scratchpad"),
]
),
)
The callback handler :
from __future__ import annotations
import json
import logging
from typing import TYPE_CHECKING, Any
from langchain_core.callbacks import AsyncCallbackHandler
from langchain_core.messages import AIMessage, BaseMessage
from langchain_core.outputs.chat_generation import ChatGeneration
from langchain_core.outputs.llm_result import LLMResult
from ...models import CustomerioEvent, Message
if TYPE_CHECKING:
from src.services.entity_manager import EntityManager
class ChatbotCallbackHandler(AsyncCallbackHandler):
"""ChatbotCallbackHandler
The ChatbotCallbackHandler class is a subclass of BaseCallbackHandler
Args:
BaseCallbackHandler (_type_): _description_
"""
def __init__(
self,
entity_manager: EntityManager,
conversation_id: int,
chatbot_name: str,
) -> None:
self.entity_manager = entity_manager
self.conversation_id = conversation_id
self.chatbot_name = chatbot_name
self.logger = logging.getLogger(__name__)
super().__init__()
async def on_chat_model_start(
self,
serialized: dict[str, Any],
messages: list[list[BaseMessage]],
**kwargs: Any,
) -> Any:
"""Run when Chat Model starts running."""
# We don't need to do anything here but the on_chat_model_start method
# is required to be implemented
async def on_llm_end(self, response: LLMResult, **_: Any) -> Any:
"""Run when LLM ends running."""
self.logger.info("on_llm_end: %s", response)
# Get the first LLM generation
gen = response.generations[0][0]
# If the generation is not a ChatGeneration or the message is not an
# AIMessage, do nothing
if not isinstance(gen, ChatGeneration) or not isinstance(
gen.message, AIMessage
):
return
# Get the tool calls and the message content extracted from them
# and store it in the database
tool_calls = gen.message.tool_calls
role = "tool"
message_content = str(
[
{"name": tool["name"], "args": tool["args"]}
for tool in tool_calls
]
)
if len(tool_calls) > 0:
self.logger.info(
"Storing message in database for conversation %s with"
"the following details: role=%s, content=%s",
self.conversation_id,
"tool",
message_content,
)
await self.entity_manager.save(
Message(
conversation_id=self.conversation_id,
content=message_content,
role=role,
)
)
# Used for DB archiving, this bit works, to test this bit of code
# can be replaced by a placeholder action
async def on_tool_start(
self, serialized: dict[str, Any], input_str: str, **kwargs: Any
) -> Any:
"""Run when tool starts running."""
tool_name = serialized["name"]
self.logger.info("Tool %s started with input: %s", tool_name, input_str)
args = json.loads(input_str.replace("'", '"'))
data: dict[str, Any] = {
"args": args,
"status": "start",
"chatbot": self.chatbot_name,
}
await self.entity_manager.save(
CustomerioEvent(
conversation_id=self.conversation_id,
name=f"bot_{tool_name}",
data=str(data),
)
)
# Same here
async def on_tool_end(self, output: Any, **kwargs: Any) -> Any:
"""Run when tool ends running."""
tool_name = kwargs.get("name")
if not isinstance(tool_name, str):
self.logger.error("Unexpected tool name: %s", tool_name)
tool_name = str(tool_name)
self.logger.info("Tool %s ended with output: %s", tool_name, output)
data: dict[str, Any] = {"output": output, "status": "end"}
self.logger.info("Data from LLM on_tool_end: %s", data)
if tool_name == "book_slot":
data["success"] = False
await self.entity_manager.save(
CustomerioEvent(
conversation_id=self.conversation_id,
name=f"bot_{tool_name}",
data=str(data),
)
)
# Same thrice
async def on_tool_error(self, error: BaseException, **_: Any) -> Any:
"""Run when tool errors."""
self.logger.error("Tool error: %s", repr(error))
raise error
The Agent Executor :
entity_manager = IOCContainer.entity_manager() # Used for DB in the agent executor
conversation_id = 123
agent_executor = AgentExecutor.from_agent_and_tools(
agent=agent,
tools=[human_in_the_loop],
callbacks=[ChatbotCallbackHandler(entity_manager, conversation_id, "bot_name")],
verbose=True,
return_intermediate_steps=True,
stream_runnable=False,
)
I tried two things to pass the conversation_id variable down to the tool call :
First, I passed it when I invoked the agent executor :
output = (
await agent_executor.ainvoke(
{
"chat_history": langchain_messages,
"conversation_id": conversation_id,
}
)
)["output"]
Second, I tried to add in the implementation of my custom CallbackHandler during the
on_tool_start
method, an affectation to the kwargs like this
def on_tool_start(self, tool, input_str, **kwargs):
if isinstance(kwargs, dict) and "conversation_id" not in kwargs:
kwargs["conversation_id"] = self.conversation_id
print(f"Injected conversation_id: {kwargs.get('conversation_id')} into tool: {tool.name}")
Either way, the conversation_id is never defined when the tool is called. Here's the Python Exception raised by Langchain :
pydantic_core._pydantic_core.ValidationError: 1 validation error for HumanInTheLoopInput
conversation_id
Field required [type=missing, input_value={'reason': 'user is insulting or using inappropriate language'}, input_type=dict]
I also tried to launch the tool by removing the InjectedToolArg type annotation in my schema. The tool was successfully launched, but my conversation_id was replaced by one the LLM hallucinated.
Execution trace of the AgentExecutor when it worked :
{
"method": "post",
"url": "/chat/completions",
"files": null,
"json_data": {
"messages": [
{
"content": "Bonjour Romain, Je vous écris à propos de votre projet d'achat à Nice pour lequel vous avez fait une simulation sur pretto.fr. Quand êtes-vous disponible pour un appel afin d'en parler avec un expert crédit ? Arthur de Pretto",
"role": "assistant"
},
{
"content": "Connard! T'es un robot ?!",
"role": "user"
},
{
"content": null,
"role": "assistant",
"tool_calls": [
{
"type": "function",
"id": "call_8KBB5JGBVtkj7WJUEXqCRdZ0",
"function": {
"name": "human_in_the_loop",
"arguments": "{\"reason\": \"user is insulting or using inappropriate language\", \"conversation_id\": 1234567}"
}
}
]
},
{
"content": "Answer this: \"Je passe votre conversation à un humain\" ",
"role": "tool",
"tool_call_id": "call_8KBB5JGBVtkj7WJUEXqCRdZ0"
}
],
"model": "gpt-4-turbo",
"n": 1,
"stream": false,
"temperature": 0.2,
"tools": [
{
"type": "function",
"function": {
"name": "human_in_the_loop",
"description": "Launch this tool everytime (i censored the description)",
"parameters": {
"properties": {
"reason": {
"description": "Reason for using this tool",
"type": "string"
},
"conversation_id": {
"description": "Conversation ID used to retrieve the account ID to book the appointment",
"type": "integer"
}
},
"required": ["reason", "conversation_id"],
"type": "object"
}
}
}
]
}
}
Is there something I don't get of how passing user-defined arguments to the tool ? Thank you in advance