LangSmith <> ActionWeaver

Integrating LangSmith tracing within a multi-reasoning bot orchestrated by ActionWeaver is seamless due to LangSmith’s simplified tracing and debugging capabilities within LLM applications.

ActionWeaver, being a framework structured around function callings, naturally lends itself to effortless integration with LangSmith.

[28]:
import os
from collections import defaultdict
from datetime import datetime
from typing import List
from uuid import UUID, uuid4

from pydantic import BaseModel, Field, PrivateAttr, validate_call

from actionweaver import action
from actionweaver.utils import DEFAULT_ACTION_SCOPE
from actionweaver.llms import wrap, ExceptionHandler, ExceptionAction, ChatLoopInfo, Continue, Return

from langsmith.run_helpers import traceable

from openai import AzureOpenAI, OpenAI

Let’s initialize an OpenAI client.

[12]:
# Setup OpenAI llm

# Azure OpenAI
MODEL="gpt-4-32k"
client = AzureOpenAI(
    azure_endpoint = os.getenv("AZURE_OPENAI_ENDPOINT"),
    api_key=os.getenv("AZURE_OPENAI_KEY"),
    api_version="2023-10-01-preview"
)

# OpenAI
# MODEL = "gpt-4"
# client = OpenAI()

Wrap the OpenAI client with ActionWeaver and LangSmith for traceability.

ActionWeaver simplifies the development of LLM applications by providing straightforward tools for structured data parsing, function dispatching, and orchestration.

To employ ActionWeaver with tracing capabilities.

  • Wrap the OpenAI client’s client.chat.completions.create method to enable OpenAI API call tracing.

  • Wrap the OpenAI client with an ActionWeaver wrapper.

  • Finally, Wrap the ActionWeaver-wrapped client’s client.create method to ensure tracing on top level client.create call.

[4]:
# Setup LangSmith Environment
os.environ["LANGCHAIN_ENDPOINT"] = "https://api.smith.langchain.com"
os.environ["LANGCHAIN_TRACING_V2"] = "true"
project_name = "actionweaver"
os.environ["LANGCHAIN_PROJECT"] = project_name

assert os.environ["LANGCHAIN_API_KEY"]

# Apply LangSmith tracing to the original LLM  client's chat completion method.
# This allows for detailed tracking of API calls to OpenAI.
client.chat.completions.create = traceable(name="llm_call", run_type="llm")(client.chat.completions.create)

# Enhance the LLM client with ActionWeaver.
llm = wrap(client)

# Track ActionWeaver wrapped create method with LangSmith tracing to monitor ActionWeaver calls.
llm.create = traceable(name="actionweaver_call", run_type="llm")(llm.create)

Let’s create an ActionWeaver action by leveraging the Search Tool from the LangChain community.

[6]:
from langchain_community.tools.google_search.tool import GoogleSearchRun
from langchain_community.utilities.google_search import GoogleSearchAPIWrapper

google_search_api = GoogleSearchRun(api_wrapper=GoogleSearchAPIWrapper())

@action(name="GoogleSearch", decorators=[traceable(run_type="tool")])
def web_search(query: str) -> str:
    """
    Perform a Google search using the provided query.
    """
    return google_search_api(query)

Create Pydantic models for multi steps reasoning

ActionWeave natively supports Pydantic for data validation when calling functions. Within this setup, we will define various models that are utilized by the reasoning bot.

Each Task model is equipped with an execute method. This method leverages the web search tool, contextualized by the task at hand. The function orchestration is defined as follows:

orch = {
      DEFAULT_ACTION_SCOPE: web_search,
      web_search.name: None
  }

This orch argument outlines the loop where the LLM calls the web search tool first, invoke LLM API again, and then delivers the final outcome.

[34]:
# Define task models used in multi steps reasoning processing
class Task(BaseModel):
    """Represents a task."""
    _uid: UUID = PrivateAttr(default_factory=uuid4)
    _created_at: datetime = PrivateAttr(default_factory=datetime.now)
    task_id: int = Field(..., description="Identifier for the task", examples = ["1"])
    description: str = Field(..., description="A comprehensive and standalone description of the task.", examples=["Create a Python function that takes a list of integers and returns the sum. The function should handle empty lists by returning zero."])
    dependencies: List[int] = Field([], description="Task IDs that this task depends on", examples=["1,2,3"])

    @traceable(run_type="tool")
    def execute_task(self, context, llm, eh) -> str:
        messages = [
            {"role": "system", "content": context},
            {"role": "user", "content": self.description}
        ]
        response = llm.create(
          model=MODEL,
          messages=messages,
          stream=False,
          orch = {
              DEFAULT_ACTION_SCOPE: web_search,
              web_search.name: None
          },
          exception_handler = eh,
        )
        return response.choices[0].message.content

class TaskPlan(BaseModel):
    """Represents a task plan, outlining the overall problem to be solved and its subtasks."""
    description: str = Field(..., description="Descrption of the overall problem needs to be solved")
    tasks: List[Task] = Field(..., description="Subtasks required to solve the problem")


Here we create an agent TaskPlanner that leverage LLM to perform two actions plan_tasks_and_solve and summarize_info through function callings.

To ensure the integrity of the inputs passed to these functions, we’re including Pydantic validate_call for input validation. Additionally, to manage exceptions and facilitate retries, a simple ExceptionHandler is implemented.

The orchestration of function calls is managed by the orch parameter, designed as follows:

orch = {
              DEFAULT_ACTION_SCOPE: self.plan_tasks_and_solve,
              self.plan_tasks_and_solve.name: self.summarize_info,
          }

This configuration instructs the LLM to first execute the plan_tasks_and_solve action, followed by the summarize_info action to condense the results and present them directly to the user. For further details on function orchestration, refer to ActionWeaver

[8]:
class TaskPlanner:
    def __init__(self, llm, eh):
        self.llm = llm
        self.debug_info = {}
        self.eh = eh
        self.messages = [
            {"role": "system", "content": "You are a task planner, approach the question by breaking it into smaller tasks and addressing each step systematically"},
        ]

    def __call__(self, query:str) -> str:
        self.messages.append({"role": "user", "content": query})
        response = llm.create(
          model=MODEL,
          messages=self.messages,
          stream=False,
          exception_handler = self.eh,
          orch = {
              DEFAULT_ACTION_SCOPE: self.plan_tasks_and_solve,
              self.plan_tasks_and_solve.name: self.summarize_info,
          }
        )
        return response

    @action(name="Summarize", stop=True, decorators=[traceable(run_type="tool")])
    @validate_call
    def summarize_info(self, content: str) -> str:
        """Condense the information to provide a concise response to the question."""
        return content

    @action(name="CreateAndExecutePlan", decorators=[traceable(run_type="tool")])
    @validate_call
    def plan_tasks_and_solve(self, task_plan: TaskPlan) -> str:
        """Create and execute a plan for complex problem"""
        self.debug_info["task_plan"] =  task_plan

        id2results = {}
        id2tasks = {}

        from graphlib import TopologicalSorter
        graph = defaultdict(set)
        for task in task_plan.tasks:
            graph[task.task_id].update(task.dependencies)
            id2tasks[task.task_id] = task

        # topo sort
        ts = TopologicalSorter(graph)
        tasks_in_order = [*ts.static_order()]

        # execute tasks
        for task_id in tasks_in_order:
            task = id2tasks[task_id]
            context = '\n'.join([f"{task.description}:{id2results[dep_id]}\n" for dep_id in task.dependencies])
            res = task.execute_task(context, self.llm, self.eh)
            id2results[task.task_id] = res

        self.debug_info["id2results"] =  id2results
        return [f"{id2results[task_id]}\n" for task_id in tasks_in_order]

class ExceptionRetryHandler(ExceptionHandler):
    def __init__(self, retry=2):
        self.retry = retry

    @traceable(run_type="tool")
    def handle_exception(self, e: Exception, info: ChatLoopInfo) -> ExceptionAction:
        if self.retry:
            self.retry -= 1
            return Continue(functions=info.context['functions'])
        raise e
[14]:
eh = ExceptionRetryHandler(2)
task_planner = TaskPlanner(llm, eh)
[15]:
response = task_planner("""Discover the establishment year of the university attended by the individual credited with inventing the computer.""")
[33]:
print (response)
Charles Babbage, often credited with inventing the computer, studied at Cambridge University. Established in 1209, it's one of the world's oldest universities.

Let’s examine how the final outcome is generated by tracing through LangSmith!

80ea99e22d9f42fd8340feb85df45a89

[ ]: